In [None]:
import re

import io

import json

import tiktoken

from itertools import cycle

from math import sqrt


from pathlib import Path

from typing import Optional, List, Tuple, Sequence, Any

from pydantic import BaseModel

from azure.core.credentials import AzureKeyCredential

from azure.ai.documentintelligence import DocumentIntelligenceClient

from azure.ai.documentintelligence.models import (
    DocumentAnalysisFeature,
    AnalyzeResult,
    DocumentContentFormat,
    DocumentParagraph,
    DocumentTable,
    DocumentStyle,
    DocumentWord,
    DocumentPage,
)

from app.proposal_object.path import ProposalObjectPaths

from langchain.schema import BaseMessage, HumanMessage, SystemMessage

from app.cost.cost import CostTracker

from app.proposal_object.schemas import Devis

from app.proposal_object.proposal_object_analyzer import ProposalObjectHandler

from config.logger_config import logger

from config.config import env_param


In [24]:
cost_tracker = CostTracker()


In [25]:
AZURE_DI_ENDPOINT = "https://cleed-di.cognitiveservices.azure.com/"

AZURE_DI_KEY = "ae3ff9603ad64f81bf052eeee5a1ffae"


In [26]:
### Hard ###
FILE_NAME = "Devis DE2502-580 - LABOURDETTE - "

FILE_NAME = "733656-00 SAS LE CARBET LOT B MEN PVC ALU BI 7016 FT"

FILE_NAME = "SC332-0i24072310090"

FILE_NAME = (
    "DEVIS LOIRE MENUISERIES SERVICES (L_M_S_)_GUERIN - TESSIER - charnieres invisbles"
)


### Medium ###
# FILE_NAME = "Lot Electricité - JUILLARD - Devis_n°OF9206"

# FILE_NAME = "Devis - CHAINE CHAINE DES ARTISANS COTE SUD - DEVIS LARTIZIEN   1"

# FILE_NAME = "Devis_'OF9189'"

# FILE_NAME = "Devis DE2412-461 - GOURDON - "


### Easy ###
# FILE_NAME = "Devis - CHAINE CHAINE DES ARTISANS COTE SUD - Vouland   Seignosse"

proposal_path = ProposalObjectPaths(file_name=FILE_NAME)


In [27]:
def token_counter(text: str) -> int:
    """Count the nb of token in a text"""

    encoding = tiktoken.get_encoding("cl100k_base")

    nb_token: int = len(encoding.encode(text))

    return nb_token


In [28]:
# Initialiser le client Document Intelligence
client = DocumentIntelligenceClient(
    endpoint=AZURE_DI_ENDPOINT, credential=AzureKeyCredential(AZURE_DI_KEY)
)

# Analyser le document (ex. "devis.pdf") avec le modèle prebuilt-layout en activant l'extraction des styles
with open(proposal_path.proposal_path, "rb") as f:
    poller = client.begin_analyze_document(
        model_id="prebuilt-layout",
        body=f,
        features=[
            DocumentAnalysisFeature.STYLE_FONT,
            DocumentAnalysisFeature.OCR_HIGH_RESOLUTION,
        ],
        output_content_format=DocumentContentFormat.MARKDOWN,
    )

analyze_result: AnalyzeResult = poller.result()

a_out = Path(proposal_path.proposal_ocr_json_path)

a_out.parent.mkdir(parents=True, exist_ok=True)

a_out.write_text(json.dumps(analyze_result.as_dict(), indent=2, ensure_ascii=False))

print(f"✅ JSON écrit dans {a_out.resolve()}")

a_out = Path(proposal_path.proposal_ocr_content_path)

a_out.parent.mkdir(parents=True, exist_ok=True)

a_out.write_text(analyze_result.content, encoding="utf-8")

print(f"✅ TEXT écrit dans {a_out.resolve()}")


# Récupération des éléments analysés
pages: Optional[List[DocumentPage]] = analyze_result.pages  # liste des pages

paragraphs: Optional[List[DocumentParagraph]] = (
    analyze_result.paragraphs
)  # blocs de texte (paragraphes) avec contenu et position

tables: Optional[List[DocumentTable]] = (
    analyze_result.tables
)  # tableaux détectés avec leurs cellules:contentReference[oaicite:3]{index=3}

styles: Optional[List[DocumentStyle]] = (
    analyze_result.styles
)  # styles de texte détectés (gras, italique, couleur, etc.):contentReference[oaicite:4]{index=4}

content: Optional[str] = (
    analyze_result.content
)  # tout le texte du document en un seul flux (ordre de lecture)


✅ JSON écrit dans /Users/remillieux/Documents/Kleek/OCR/data/proposal_ocr_json/DEVIS LOIRE MENUISERIES SERVICES (L_M_S_)_GUERIN - TESSIER - charnieres invisbles.json
✅ TEXT écrit dans /Users/remillieux/Documents/Kleek/OCR/data/proposal_ocr_content/DEVIS LOIRE MENUISERIES SERVICES (L_M_S_)_GUERIN - TESSIER - charnieres invisbles.txt


# Get line


In [29]:
class LineStyle(BaseModel):
    # ── infos « brutes » ───────────────────────────────────────
    font_weight: Optional[str] = None  # « bold » ou « normal »
    font_style: Optional[str] = None  # « italic » ou « normal »
    font_size: Optional[float] = None
    color: Optional[str] = None
    font_family: Optional[str] = None
    background_color: Optional[str] = None  # facultatif

    # ── propriétés dérivées ───────────────────────────────────
    @property
    def is_bold(self) -> bool:
        return (self.font_weight or "").lower() == "bold"

    @property
    def is_italic(self) -> bool:
        return (self.font_style or "").lower() == "italic"


# ────────────────────────────────────────────────────────────────
class Cell(BaseModel):
    row_index: int
    column_index: int
    content: str
    offset_px: float
    offset_span: int
    span_len: int
    polygon: Sequence[float]
    word_used_for_size: Optional[str] = None  # mot utilisé pour la taille


class TableRow(BaseModel):
    """Représente une ligne de tableau avec ses cellules"""

    row_index: int

    cells: List[Cell]

    level_h: int = 5

    line_styles: List[LineStyle] = []


class Table(BaseModel):
    offset_span: int

    len_span: int

    table_rows: List[TableRow]

    # ── offset maximal (px) par colonne ─────────────────────────
    def _col_offset_max(self) -> List[float]:
        mx: List[float] = []
        for row in self.table_rows:
            for c in row.cells:
                i = c.column_index
                if i >= len(mx):
                    mx.append(c.offset_px)
                else:
                    mx[i] = max(mx[i], c.offset_px)
        return mx

    # ── largeur (indent relative + texte) par colonne ───────────
    def _col_width(self, col_offset_max: List[float]) -> List[int]:
        w: List[int] = [0] * len(col_offset_max)
        for row in self.table_rows:
            for c in row.cells:
                i = c.column_index
                if i == 0:
                    indent = int(c.offset_px * MULT)
                else:
                    indent = int(max(0, (c.offset_px - col_offset_max[i - 1])) * MULT)
                w[i] = max(w[i], indent + len(c.content))
        return w

    # ── affichage ───────────────────────────────────────────────
    def print_table(self) -> None:
        col_off = self._col_offset_max()
        col_w = self._col_width(col_off)

        for row in self.table_rows:
            parts = [" " * col_w[i] for i in range(len(col_w))]

            for c in row.cells:
                i = c.column_index
                if i == 0:
                    indent = int(c.offset_px * MULT)
                else:
                    indent = int(max(0, (c.offset_px - col_off[i - 1])) * MULT)
                cell_txt = (" " * indent + c.content).ljust(col_w[i])
                parts[i] = cell_txt

            print(SEP.join(parts).rstrip())


# 20 icônes fixes dans lesquelles on pioche
ICONS = [
    "-",
    "_",
    "#",
    "•",
    "◦",
    "▪",
    "▫",
    "►",
    "▸",
    "‣",
    "➤",
    "→",
    "⇒",
    "★",
    "☆",
    "✓",
    "✗",
    "■",
    "□",
    "◆",
]


def hex_to_rgb(hex_code: str) -> Optional[tuple[int, int, int]]:
    """'#a1b2c3' -> (161, 178, 195) ; insensible à la casse."""
    hex_code = hex_code.strip().lstrip("#")
    if len(hex_code) != 6:
        return None
    r, g, b = (int(hex_code[i : i + 2], 16) for i in (0, 2, 4))
    return r, g, b


def rgb_distance(c1: tuple[int, int, int], c2: tuple[int, int, int]) -> float:
    """Distance euclidienne dans l’espace RGB (0-255)."""
    return sqrt(sum((a - b) ** 2 for a, b in zip(c1, c2)))


def assign_icons(colors: set[str], threshold: float = 10.0) -> dict[str, str]:
    """
    Associe à chaque couleur une icône.
    Les couleurs dont la distance RGB ≤ `threshold` partagent la même icône.
    Renvoie un dict {couleur_hex -> icône}.
    """
    # Tourniquet infini sur la liste d’icônes
    icon_pool = cycle(ICONS)

    groups: list[tuple[tuple[int, int, int], str]] = []  # [(rgb_référence, icône)]
    mapping: dict[str, str] = {}

    for hex_code in colors:
        rgb = hex_to_rgb(hex_code)

        if rgb is None:
            continue
        # Cherche un groupe existant assez proche
        for ref_rgb, icon in groups:
            if rgb_distance(rgb, ref_rgb) <= threshold:
                mapping[hex_code] = icon
                break
        else:  # aucune couleur proche : on crée un nouveau groupe
            icon = next(icon_pool)
            groups.append((rgb, icon))
            mapping[hex_code] = icon

    return mapping


def _round05(x: float) -> float:  # 12.27 → 12.5   |   11.24 → 11.0
    return round(x * 2) / 2


def _decorate(text: str, *, bold: bool, italic: bool) -> str:
    if bold:
        text = f"**{text}**"
    if italic:
        text = f"__{text}__"
    return text


def _first_cell_text(
    row: TableRow,
    raw: str,
    max_level: int,
    icon_map: dict[str, str] | None = None,
) -> str:
    # -------- icône éventuelle --------
    icon = ""
    if icon_map:  # mapping fourni ?
        for st in row.line_styles:  # parcourt les styles de la ligne
            if st.background_color in icon_map:  # 1re couleur trouvée
                icon = icon_map[st.background_color] + " "
                break

    # -------- mise en valeur bold/italic ----------
    bold = any(s.is_bold for s in row.line_styles)  # plus [:1]
    italic = any(s.is_italic for s in row.line_styles)

    return icon + _decorate(raw, bold=bold, italic=italic)


def _row_style_score(row: TableRow) -> tuple[float, int]:
    fs = _round05(next((s.font_size for s in row.line_styles if s.font_size), 0.0))
    bold = any(s.is_bold for s in row.line_styles)
    italic = any(s.is_italic for s in row.line_styles)
    style_w = (2 if bold else 0) + (-1 if italic else 0)  # plus de surligneur
    return fs, style_w


def assign_h_levels(tables: list[Table]) -> None:
    """
    Attribue à chaque ligne un level_h ∈ {1..5} uniquement selon la taille
    de police (arrondie au 0 ,5 pt).
    """
    rows = [r for t in tables for r in t.table_rows]

    # on récupère la taille (0 si indéterminée) et on arrondit au ½‐point
    for r in rows:
        fs = next((s.font_size for s in r.line_styles if s.font_size), 0.0)
        r._fs_rounded = _round05(fs)  # attribut temporaire

    # tailles distinctes, ordre décroissant
    sizes_desc = sorted({r._fs_rounded for r in rows}, reverse=True)

    # on garde au plus 5 groupes
    size2level = {sz: idx + 1 for idx, sz in enumerate(sizes_desc[:5])}

    for r in rows:
        r.level_h = size2level.get(r._fs_rounded, 5)

    # nettoyage
    for r in rows:
        del r._fs_rounded


def _page_unit(page: DocumentPage) -> str:  # déjà présent plus haut
    return getattr(page, "unit", page.unit)  # 'pixel' ou 'inch'


_METRIC = "bbox"  # <--- changez ici une bonne fois pour toutes
#   (ou passez le paramètre aux fonctions)


def _poly_height_px(
    polygon: Sequence[float], unit: str, *, dpi: int = 96, method: str = _METRIC
) -> float:
    """Hauteur du polygone selon la *métrique choisie*, convertie en px."""
    ys = polygon[1::2]  # on ne garde que les Y
    if not ys:
        return 0.0

    if method == "bbox":  # hauteur complète
        h_raw = max(ys) - min(ys)

    elif method == "mini":  # écart min2 − min1
        ys_sorted = sorted(ys)
        if len(ys_sorted) < 2:
            h_raw = 0.0
        else:
            h_raw = ys_sorted[1] - ys_sorted[0]

    else:
        raise ValueError(f"méthode inconnue : {method}")

    # conversion éventuelle
    if unit == "pixel":
        return h_raw
    if unit == "inch":
        return h_raw * dpi
    raise ValueError(f"Unité inconnue : {unit!r}")


def _intersects(a: tuple[int, int], b: tuple[int, int]) -> bool:
    """Deux intervalles de caractères se chevauchent-ils ?"""
    return not (a[1] <= b[0] or a[0] >= b[1])


def estimate_row_height_px(
    row: TableRow,
    pages: List[DocumentPage],
    *,
    dpi: int = 96,
) -> float:
    """
    Estime la hauteur d'une ligne en pixels.
    """

    max_h_px = 0.0

    for cell in row.cells[:1]:
        for page in pages:
            for word in page.words:
                if _intersects(
                    (word.span.offset, word.span.offset + word.span.length),
                    (cell.offset_span, cell.offset_span + cell.span_len),
                ):
                    # Calcule la hauteur du polygone de la cellule
                    h_px = _poly_height_px(word.polygon, unit="inch", dpi=dpi)

                    max_h_px = max(max_h_px, h_px)

                    if h_px == max_h_px:
                        cell.polygon = word.polygon

                        cell.word_used_for_size = word.content

                        return max_h_px

    return max_h_px  # 0.0 si aucun mot trouvé


# ────────────────────────────────────────────────────────────────
#  Styles Azure  ➜  LineStyle
# ────────────────────────────────────────────────────────────────
def _to_line_style(s: DocumentStyle) -> LineStyle:
    return LineStyle(
        font_size=getattr(s, "font_size", None),
        font_weight=getattr(s, "font_weight", None),
        font_style=getattr(s, "font_style", None)
        or getattr(s, "font_style_name", None),
        color=getattr(s, "color", None) or getattr(s, "font_color", None),
        font_family=getattr(s, "similar_font_family", None)
        or getattr(s, "font_family", None),
        background_color=getattr(s, "backgroundColor", None)
        or getattr(s, "background_color", None),
    )


def _styles_for_row(row: TableRow, styles: list[DocumentStyle]) -> list[LineStyle]:
    ranges = [(c.offset_span, c.offset_span + c.span_len) for c in row.cells]

    def intersects(a: tuple[int, int], b: tuple[int, int]) -> bool:
        return not (a[1] <= b[0] or a[0] >= b[1])

    out: list[LineStyle] = []
    for st in styles:
        for sp in st.spans or []:
            span_rng = (sp.offset, sp.offset + sp.length)
            if any(intersects(span_rng, r) for r in ranges):
                out.append(_to_line_style(st))
                break
    return out


def first_point_x_pixels(
    polygon: Sequence[float], *, unit: str, dpi: int = 96
) -> float:
    if not polygon or len(polygon) < 2:
        raise ValueError("Polygone invalide")

    # <<< bord gauche robustement >>>
    x_raw = min(polygon[::2])

    if unit == "pixel":
        return x_raw
    if unit == "inch":
        return x_raw * dpi
    if unit == "millimeter":
        return x_raw * dpi / 25.4
    raise ValueError(f"Unité inconnue : {unit!r}")


def _span_offset(span: Any) -> int:
    """
    Renvoie l’offset d’un objet span que ce soit :
    • un DocumentSpan SDK  → .offset
    • un dict {"offset": …}
    • un dict {"span": {"offset": …}} (JSON brut d’un word)
    """
    if span is None:
        return -1
    if hasattr(span, "offset"):
        return span.offset
    if isinstance(span, dict):
        if "offset" in span:
            return span["offset"]
        if "span" in span and "offset" in span["span"]:
            return span["span"]["offset"]
    raise ValueError("Span non reconnu")


def _table_col_widths(
    table: Table, col_off: list[float], max_level: int, bg2icon: dict[str, str] | None
) -> list[int]:
    w = [0] * len(col_off)
    for row in table.table_rows:
        for c in row.cells:
            i = c.column_index
            txt = (
                _first_cell_text(row, c.content, max_level, icon_map=bg2icon)
                if i == 0
                else c.content
            )
            indent = (
                int(c.offset_px * MULT)
                if i == 0
                else int(max(0, c.offset_px - col_off[i - 1]) * MULT)
            )
            w[i] = max(w[i], indent + len(txt))
    return w


def find_word_after_span(
    pages: List[DocumentPage],
    offset_min: int,
    target: str,
) -> Optional[DocumentWord]:
    """
    Parcourt la liste `words` **dans l’ordre** et renvoie le premier « word »
    (objet SDK ou dict JSON) dont le texte == `target`, à partir du span donné.

    Paramètres
    ----------
    words : list[DocumentWord | dict]
        Tableau de mots provenant de `page.words` ou pages[i]["words"].
    start_span : DocumentSpan | dict
        Span de référence (typiquement paragraph.spans[0] ou cell.spans[0]).
    target : str | int | float
        Mot ou nombre à chercher (ex. "300", 300, "Chauffe-eau").
    ignore_case : bool
        Si True (défaut) la comparaison n’est pas sensible à la casse.

    Retour
    ------
    DocumentWord | dict | None
        L’objet correspondant ou None si non trouvé.
    """

    target = target.lower()

    for page in pages:
        if not page.words:
            continue

        words: List[DocumentWord] = page.words

        words_sorted = sorted(
            words,
            key=lambda w: _span_offset(w.span),
        )

        for w in words_sorted:
            if w.span.offset < offset_min:
                continue

            if w.content.lower() == target:
                return w

    return None


def first_token(text: str) -> str:
    """
    Renvoie le premier mot/nombre selon la règle ci-dessus.
    Si le symbole € ou $ suit immédiatement (après blancs),
    il est inclus dans la chaîne renvoyée.

    Exemples
    --------
    >>> first_token("  - E.C. et Qté. :")
    'E.C.'
    >>> first_token(" Qté. en premier")
    'Qté.'
    >>> first_token("= 135,00 € TTC")
    '135,00 €'
    >>> first_token("Total : 99.5$")
    '99.5$'
    >>> first_token("###")
    ''
    """
    # 1) trouver les « blocs » séparés par des blancs
    for m in re.finditer(r"\S+", text):
        chunk = m.group(0)
        # doit contenir au moins une lettre ou un chiffre
        if not any(ch.isalnum() for ch in chunk):
            continue

        start = m.start()
        end = m.end()

        # 2) regarde s’il y a € ou $ juste après (en sautant les blancs)
        i = end
        while i < len(text) and text[i].isspace():
            i += 1
        if i < len(text) and text[i] in "€$":
            # on inclut tout du début du mot jusqu’au symbole compris
            return text[start : i + 1]

        # sinon on renvoie juste le bloc trouvé
        return chunk

    return ""  # aucun token valable


def _para_offset(p: DocumentParagraph) -> int:
    return p.spans[0].offset if p.spans else 2**31 - 1


def _overlaps(start: int, length: int, ranges: list[tuple[int, int]]) -> bool:
    end = start + length
    return any(not (end <= lo or start >= hi) for lo, hi in ranges)


MULT = 10  # 1 espace = 0,1 px
SEP = " | "  # séparateur


def rebuilt_tables(
    tables: List[DocumentTable], pages: List[DocumentPage], styles: list[DocumentStyle]
) -> Tuple[List[Table], List[tuple[int, int]]]:
    """Return a list of half‑open character ranges covered by *tables*."""

    print("# FOUNDING TABLES SPANS\n")

    print("There are {} tables".format(len(tables)))

    tables_rebuilt: List[Table] = []

    ranges: List[tuple[int, int]] = []

    for table in tables or []:
        table_rebuilt = Table(
            table_rows=[],
            offset_span=table.spans[0].offset if table.spans else 0,
            len_span=table.spans[0].length if table.spans else 0,
        )

        for sp in table.spans:
            ranges.append((sp.offset, sp.offset + sp.length))

        if getattr(table, "spans", None):
            for span in table.spans:
                first_table_word = table.cells[0].content if table.cells else "No cells"

                print(
                    "Table spans: {} - {} [{}]".format(
                        span.offset, span.offset + span.length, first_table_word
                    ),
                )

        print(f"\t- Nombre lignes: {table.row_count}")

        print(f"\t- Nombre colonnes: {table.column_count}")

        for cell in table.cells:
            if cell.column_index == 0:
                table_row = TableRow(
                    row_index=cell.row_index,
                    cells=[],
                )

                table_rebuilt.table_rows.append(table_row)
            if cell.content:
                first_cell_word_text: str = first_token(cell.content)

                first_cell_word_object: Optional[DocumentWord] = find_word_after_span(
                    pages,
                    offset_min=cell.spans[0].offset,
                    target=first_cell_word_text,
                )

                if not first_cell_word_object:
                    print(
                        f"\t\t- {cell.row_index}: {cell.content} [NONE px] ({first_cell_word_text}, span {cell.spans[0].offset})"
                    )

                    first_cell_polygon = cell.bounding_regions[0].polygon

                    first_position_x = first_point_x_pixels(
                        first_cell_polygon, unit="pixel", dpi=96
                    )

                    table_rebuilt.table_rows[-1].cells.append(
                        Cell(
                            row_index=cell.row_index,
                            column_index=cell.column_index,
                            content=cell.content,
                            offset_span=cell.spans[0].offset,
                            span_len=cell.spans[0].length,  # ←
                            offset_px=first_position_x,
                            polygon=first_cell_polygon,
                        )
                    )

                    continue

                else:
                    first_cell_polygon = (
                        first_cell_word_object.polygon if table.cells else None
                    )

                    first_position_x = first_point_x_pixels(
                        first_cell_polygon, unit="pixel", dpi=96
                    )

                    print(
                        f"\t\t- {cell.row_index}: {cell.content} [{first_position_x} px] ({first_cell_word_text}, span {cell.spans[0].offset})"
                    )

                table_rebuilt.table_rows[-1].cells.append(
                    Cell(
                        row_index=cell.row_index,
                        column_index=cell.column_index,
                        content=cell.content,
                        offset_span=cell.spans[0].offset if cell.spans else 0,
                        span_len=cell.spans[0].length if cell.spans else 0,
                        offset_px=first_position_x,
                        polygon=cell.bounding_regions[0].polygon,
                    )
                )

        table_row.line_styles = _styles_for_row(table_row, styles)

        for row in table_rebuilt.table_rows:
            row.line_styles = _styles_for_row(row, styles)

        if table_rebuilt.table_rows:
            tables_rebuilt.append(table_rebuilt)

    for table in tables_rebuilt:
        for row in table.table_rows:
            row_font_pt = estimate_row_height_px(row=row, pages=pages)

            # row_font_pt = round(row_font_pt, 0)

            # print(f"Ligne {row.row_index} : ~{row_font_pt:.1f} pt")
            # vous pouvez aussi stocker la valeur dans row.line_styles, par ex.:
            row.line_styles.append(LineStyle(font_size=row_font_pt))

    # for table in tables_rebuilt:
    #     table.print_table()

    all_font_size = set()

    all_bg_color = set()

    all_color = set()

    all_font_family = set()

    all_font_weight = set()

    all_font_style = set()

    for table in tables_rebuilt:
        for row in table.table_rows:
            for cell in row.cells:
                for style in row.line_styles:
                    if style.font_size is not None:
                        all_font_size.add(style.font_size)
                    if style.color is not None:
                        all_color.add(style.color)
                    if style.font_family is not None:
                        all_font_family.add(style.font_family)
                    if style.font_weight is not None:
                        all_font_weight.add(style.font_weight)
                    if style.font_style is not None:
                        all_font_style.add(style.font_style)
                    if style.background_color is not None:
                        all_bg_color.add(style.background_color)

    print(f"\n# FOUNDING TABLES STYLES\n")
    print(f"## Font Sizes: {all_font_size}")
    print(f"## Background Colors: {all_bg_color}")
    print(f"## Text Colors: {all_color}")
    print(f"## Font Families: {all_font_family}")
    print(f"## Font Weights: {all_font_weight}")
    print(f"## Font Styles: {all_font_style}")

    bg2icon = assign_icons(all_bg_color, threshold=10.0)  # <- ①
    print(f"## BG → icon mapping : {bg2icon}")  # debug

    return tables_rebuilt, ranges, bg2icon


def build_document_text(
    paragraphs: List[DocumentParagraph],
    tables: List[DocumentTable],
    pages: List[DocumentPage],
    styles: List[DocumentStyle],
) -> str:
    """
    Reconstruit le document (paragraphes + tableaux) et renvoie **une seule
    chaîne** où chaque élément est sur sa propre ligne.

    Le contenu des tables est formaté avec les mêmes règles d’alignement que
    dans votre fonction `Table.print_table()`.
    """
    # 1) tables reconstruites + leurs plages de spans
    tables_built, tbl_ranges, bg2icon = rebuilt_tables(tables, pages, styles)

    assign_h_levels(tables_built)

    print("\nDEBUG – premières cellules et tailles mesurées")
    for t in tables_built:
        for row in t.table_rows:
            fs = next((s.font_size for s in row.line_styles if s.font_size), 0.0)
            first_txt = row.cells[0].content.strip() if row.cells else ""
            print(
                f"  h{row.level_h} | {fs:4.3f} pt | {first_txt[:60]} | polygone {row.cells[0].polygon if row.cells else 'None'} | word use for size {row.cells[0].word_used_for_size if row.cells else 'None'}"
            )
    print("--- fin debug ---\n")

    # 2) flux unifié (offset, kind, obj)
    items: List[Tuple[int, str, object]] = []

    for p in paragraphs:
        if p.spans and _overlaps(p.spans[0].offset, p.spans[0].length, tbl_ranges):
            continue  # paragraphe « dans » une table
        # items.append((_para_offset(p), "para", p))

    for t in tables_built:
        items.append((t.offset_span, "table", t))

    items.sort(key=lambda x: x[0])  # ordre de lecture

    # 3) accumulation dans un buffer mémoire
    buf = io.StringIO()

    for _, kind, obj in items:
        if kind == "para":
            buf.write(obj.content.strip() + "\n")
        else:  # table
            # — réutilise le code d’affichage existant —
            max_level = max(r.level_h for r in obj.table_rows) or 5
            col_off = obj._col_offset_max()
            col_w = _table_col_widths(obj, col_off, max_level, bg2icon)

            for row in obj.table_rows:
                parts = [" " * col_w[i] for i in range(len(col_w))]
                for c in row.cells:
                    i = c.column_index
                    indent = (
                        int(c.offset_px * MULT)
                        if i == 0
                        else int(max(0, c.offset_px - col_off[i - 1]) * MULT)
                    )
                    cell_txt = (
                        _first_cell_text(row, c.content, max_level, bg2icon)
                        if i == 0
                        else c.content
                    )
                    parts[i] = (" " * indent + cell_txt).ljust(col_w[i])
                buf.write(SEP.join(parts).rstrip() + "\n")
            buf.write("\n")

    return buf.getvalue()


doc_txt = build_document_text(
    paragraphs=paragraphs or [],
    tables=tables or [],
    pages=pages or [],
    styles=styles or [],
)

print("\n\n\n\n\n####################\n\n\n\n\n")

print(doc_txt)


a_out = Path(proposal_path.proposal_html_path)

a_out.parent.mkdir(parents=True, exist_ok=True)

a_out.write_text(doc_txt, encoding="utf-8")

print(f"✅ Proposal text written to {a_out.resolve()}")


# FOUNDING TABLES SPANS

There are 13 tables
Table spans: 483 - 2012 [Désignation]
	- Nombre lignes: 4
	- Nombre colonnes: 5
		- 0: Désignation [2.5446 px] (Désignation, span 500)
		- 0: Qté [5.6243 px] (Qté, span 521)
		- 0: Pu HT [6.173 px] (Pu, span 534)
		- 0: Total HT [6.83 px] (Total, span 549)
		- 0: TVA [7.5303 px] (TVA, span 567)
		- 1: Entrée : Porte circulaire . - Châssis unique version CIRCULAR plaque de plâtre rayon 1200 ouverture à droite pour porte de LP=830 HP=2040 mm - Porte + Habillage + Couvre-joints unique version CIRCULAR plaqué Tanganika rayon 1200 ouverture à droite pour porte de LP= 830 HP=2040 mm - Kiit AGB RONDO (serrure condamnation/decondamnation) finition Chromé Satiné pour porte coulissante bois (épr 40 mm) . SOUSRESERVE D'UN DESCRIPTIF TECHNIQUE PLUS PRECIS POUR ADAPTER AU BESOIN CLIENT [0.4785 px] (Entrée, span 591)
		- 1: 1 [5.7118 px] (1, span 1081)
		- 1: 3 829,44 € [6.0781 px] (3, span 1092)
		- 1: 3 829,44 € [6.8275 px] (3, span 1112)
		- 1: 20 % [7

In [30]:
from openai import OpenAI

from pydantic import BaseModel, Field


class NoeudStructureProduitsDevis(BaseModel):
    """
    Représente la structure des produits du devis.
    """

    nom: str = Field(
        ...,
        description="Nom du noeud de la structure des produits du devis",
    )

    sous_groupes_produits: Optional[List["NoeudStructureProduitsDevis"]] = Field(
        None,
        description="Liste des sous-groupes des produits du devis",
    )


class StructureProduitsDevis(BaseModel):
    groupes_produits: Optional[List["NoeudStructureProduitsDevis"]] = Field(
        None,
        description="Une ou plusieurs groupes de produits du devis. Si il n'y a qu'un seul groupe, la liste est vide.",
    )


PROPOSAL_SECTION_ANALYZER_SYSTEM_PROMPT = """Vous êtes le meilleur analyseur de devis du monde. Vous devez analyser un devis au format HTML pour déterminer la structure des catégories de produits de celui-ci, c'est-à-dire les différentes sections et sous-sections des produits du devis. Vous devez vérifier que le prix total de la catégorie correspond bien au prix des produits listés dans la catégorie. Respectez la structure originale du devis, ne tentez pas de fusionner des lignes. Ne comptez pas les nouvelles pages comme une nouvelle section. N'inventez aucune ligne."""

model = "gpt-4.1"

client = OpenAI()

file = client.files.create(
    file=open(proposal_path.proposal_path, "rb"), purpose="user_data"
)

with open(
    proposal_path.proposal_html_path,
    "r",
    encoding="utf-8",
) as f:
    ocr_output = f.read()

completion = client.beta.chat.completions.parse(
    model=model,
    messages=[
        {
            "role": "system",
            "content": [
                {"type": "text", "text": PROPOSAL_SECTION_ANALYZER_SYSTEM_PROMPT}
            ],
        },
        {
            "role": "user",
            "content": [
                {"type": "text", "text": "Voici le devis:\n\n" + ocr_output},
                {
                    "type": "file",
                    "file": {
                        "file_id": file.id,
                    },
                },
            ],
        },
    ],
    tools=[],
    store=False,
    response_format=StructureProduitsDevis,
    # reasoning_effort="low",
)

proposal_structure: Optional[StructureProduitsDevis] = completion.choices[
    0
].message.parsed

if proposal_structure is None:
    logger.error("PROPOSAL => No structured proposal found.")


proposal_structure_json = proposal_structure.model_dump_json(indent=2)

print(f"Section analysis :\n{proposal_structure_json}")

print(f"Section analysis token :{token_counter(proposal_structure_json)}")

a_out = Path(proposal_path.proposal_section_analysis_path)

a_out.parent.mkdir(parents=True, exist_ok=True)

a_out.write_text(proposal_structure_json, encoding="utf-8")

print(f"✅ Section analysis written to {a_out.resolve()}")

cost_tracker.add_openai_query(
    model=model,
    nb_input_token=completion.usage.prompt_tokens,
    nb_output_token=completion.usage.completion_tokens,
    function_name="proposal_section_analyzer",
)

print(f"Total cost: {cost_tracker.cost.model_dump_json(indent=2)}")


Section analysis :
{
  "groupes_produits": [
    {
      "nom": "Entrée",
      "sous_groupes_produits": [
        {
          "nom": "Porte circulaire",
          "sous_groupes_produits": null
        }
      ]
    },
    {
      "nom": "Cellier",
      "sous_groupes_produits": [
        {
          "nom": "Porte Plane Kreation - Ame pleine",
          "sous_groupes_produits": null
        },
        {
          "nom": "CLYPSO KREATION pour mur de 71 à 98 mm (n°1)",
          "sous_groupes_produits": null
        },
        {
          "nom": "Béquille B01 Inox PDDT",
          "sous_groupes_produits": null
        }
      ]
    },
    {
      "nom": "Dégagement 2",
      "sous_groupes_produits": [
        {
          "nom": "Porte Plane Kreation - Ame pleine",
          "sous_groupes_produits": null
        },
        {
          "nom": "CLYPSO KREATION pour mur de 71 à 98 mm (n°1)",
          "sous_groupes_produits": null
        },
        {
          "nom": "Béquille B01 Inox PDDT

In [31]:
from app.agent.agent import Agent

PROPOSAL_ANALYZER_SYSTEM_PROMPT = (
    "Vous êtes le meilleur analyseur de devis du monde. Vous devez reconstruire le devis à partir d'un texte brut. "
    "Pour chaque produit du devis, vous devez très précisément noter son label, sa description, le lot auquel il appartient, son prix HT unitaire, la quantité, l'unité, la TVA et les possibles coûts supplémentaires. "
    "Les produits peuvent être groupés par catégories et ou contenir des sous-catégories de produits imbriquées. "
    "1 - Trouvez la structure exacte de chaque catégorie du devis. "
    "2 - Vérifiez que le prix total de la catégorie correspond bien au prix des produits listés dans la catégorie."
    "3 - Respectez la structure originale du devis, ne tentez pas de fusionner des lignes. "
    "4 - Ne comptez pas les nouvelles pages comme une nouvelle section. "
    "5 - N'inventez aucune ligne. "
)

model = "gpt-4.1"  # "gpt-4o-mini" or "o4-mini" or "gpt-4o" or "gpt-4o-2024-08-06"


class ProposalAnalyzer:
    """Proposal analyzer"""

    def __init__(self, cost_tracker: CostTracker) -> None:
        self.agent: Agent = Agent(
            api_key=env_param.OPENAI_API_KEY,
            model_name=model,
            system_promt=PROPOSAL_ANALYZER_SYSTEM_PROMPT,
            nb_retry=2,
            temprature=1 if model in ["o4-mini", "o3"] else 0,
            timeout=200,
            name="proposal_analyzer",
            cost_tracker=cost_tracker,
            reasoning_effort="high",
        )

    async def analyze(self, chat_historic: List[BaseMessage]) -> Optional[Devis]:
        """Structure the proposal"""

        proposal_structured: Optional[Devis] = await self.agent.run(
            response_format=Devis, chat_historic=chat_historic
        )

        return proposal_structured

    async def structure_proposal(
        self, raw_proposal: str, section_analysis: str
    ) -> Optional[Devis]:
        """Check if the client is spam"""

        chat_historic = [
            SystemMessage(content=PROPOSAL_ANALYZER_SYSTEM_PROMPT),
            HumanMessage(
                "Voici un premier analyse de la sturcutre du devis :\n\n"
                + section_analysis
            ),
            HumanMessage(content=raw_proposal),
        ]

        proposal_structured: Optional[Devis] = await self.analyze(chat_historic)

        if proposal_structured is None:
            logger.info("PROPOSAL => No structured proposal found.")

            return None

        return proposal_structured


In [32]:
with open(
    proposal_path.proposal_html_path,
    "r",
    encoding="utf-8",
) as f:
    raw_proposal = f.read()

with open(
    proposal_path.proposal_section_analysis_path,
    "r",
    encoding="utf-8",
) as f:
    section_analysis_raw = f.read()

section_analysis: Optional[StructureProduitsDevis] = (
    StructureProduitsDevis.model_validate_json(section_analysis_raw)
)

quote_analyzer = ProposalAnalyzer(cost_tracker=cost_tracker)

proposal: Optional[Devis] = await quote_analyzer.structure_proposal(
    raw_proposal=raw_proposal,
    section_analysis=section_analysis.model_dump_json(indent=2),
)

if proposal is None:
    logger.error("PROPOSAL => No structured proposal found.")
    exit(1)

proposal_json = proposal.model_dump_json(indent=2)

print(f"PROPOSAL => {proposal_json}")

a_out = Path(proposal_path.proposal_object_predicted_path)

a_out.parent.mkdir(parents=True, exist_ok=True)

a_out.write_text(proposal_json)

print(f"✅ OBJECT prédit écrit dans {a_out.resolve()}")


print(f"Total cost: {cost_tracker.cost.model_dump_json(indent=2)}")


2025-06-27 19:41:53 [92mINFO[0m -> [app/performances/time_counter.py.time_counter.async_wrapper.37]:                                        PERFORMANCES => Function 'run' executed in : 51.529 seconds


PROPOSAL => {
  "devis_total_ht": 14743.18,
  "devis_total_ttc": 17691.82,
  "devis_total_tva": 2948.64,
  "devis_coup_additionnel": null,
  "devis_produits": [
    {
      "label": "Entrée : Porte circulaire",
      "description": "Châssis unique version CIRCULAR plaque de plâtre rayon 1200 ouverture à droite pour porte de LP=830 HP=2040 mm - Porte + Habillage + Couvre-joints unique version CIRCULAR plaqué Tanganika rayon 1200 ouverture à droite pour porte de LP= 830 HP=2040 mm - Kit AGB RONDO (serrure condamnation/décondamnation) finition Chromé Satiné pour porte coulissante bois (ép. 40 mm). SOUS RESERVE D'UN DESCRIPTIF TECHNIQUE PLUS PRECIS POUR ADAPTER AU BESOIN CLIENT",
      "quantite": 1.0,
      "unitee_quantite": null,
      "price_unitaire_ht": 3829.44,
      "tva": "TVA 20%",
      "cout_additionnel": null,
      "sous_produits": null,
      "lot": "MENUISERIE"
    },
    {
      "label": "Cellier : Porte Plane Kreation - Ame pleine",
      "description": "Haut 2040 mm x La

In [None]:
ProposalObjectHandler(proposal_path=proposal_path).analyze_proposal_object()


2025-06-27 19:41:53 [92mINFO[0m -> [app/proposal_object/proposal_object.py.proposal_object.convert_to_text.49]:                             ✅ PROPOSAL TEXT write in /Users/remillieux/Documents/Kleek/OCR/data/proposal_text/DEVIS LOIRE MENUISERIES SERVICES (L_M_S_)_GUERIN - TESSIER - charnieres invisbles.txt
2025-06-27 19:41:53 [92mINFO[0m -> [app/proposal_object/proposal_object.py.proposal_object.convert_to_pdf.65]:                              ✅ PROPOSAL PDF write in /Users/remillieux/Documents/Kleek/OCR/data/proposal_pdf_object/DEVIS LOIRE MENUISERIES SERVICES (L_M_S_)_GUERIN - TESSIER - charnieres invisbles.pdf
2025-06-27 19:41:53 [92mINFO[0m -> [app/proposal_object/proposal_object.py.proposal_object.convert_to_proposal_lines.94]:                   ✅ PROPOSAL LINES write in /Users/remillieux/Documents/Kleek/OCR/data/proposal_object_line/DEVIS LOIRE MENUISERIES SERVICES (L_M_S_)_GUERIN - TESSIER - charnieres invisbles.txt
2025-06-27 19:41:53 [92mINFO[0m -> [app/proposal_object

arraybcimunnm = [
    "|    PACK    |   TVA   |                                         LABEL                                          |                                                                                                                                                                                                                            DESCRIPTION                                                                                                                                                                                                                            | QTY | UNIT | UNIT_PRICE_HT | TOTAL_HT |",
    "|------------|---------|----------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

FileNotFoundError: [Errno 2] No such file or directory: 'data/proposal_object_know/DEVIS LOIRE MENUISERIES SERVICES (L_M_S_)_GUERIN - TESSIER - charnieres invisbles.json'

: 