In [80]:
import json
import logging
import os
import re
from pprint import pprint
from typing import Any, Dict, List, Literal, Optional, Tuple
from uuid import uuid4

from dotenv import load_dotenv
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
from langchain_core.messages import HumanMessage
from langgraph.checkpoint.memory import MemorySaver
from pydantic import BaseModel, Field, create_model
from PyPDF2 import PdfReader

logging.basicConfig(level=logging.DEBUG, force=True)
logger = logging.getLogger(__name__)

load_dotenv()

DATA_DIR = "../data"

In [81]:
def normalize_whitespace(text: str) -> str:
    return " ".join(text.split())


def normalize_structure(text: str) -> str:
    # 1. Add a space between conjoined letters and numbers
    # (e.g., "Seccional101943" -> "Seccional 101943")
    text = re.sub(r"([a-zA-Z])(\d)", r"\1 \2", text)
    text = re.sub(r"(\d)([a-zA-Z])", r"\1 \2", text)

    # 2. Add a space between conjoined words (e.g., "GOKUInscrição")
    # This looks for a lowercase/uppercase letter, followed by an
    # uppercase and then a lowercase (start of a new word).
    text = re.sub(r"([a-z])([A-Z])", r"\1 \2", text)
    text = re.sub(r"([A-Z])([A-Z][a-z])", r"\1 \2", text)

    # 3. Collapse multiple spaces/tabs *on the same line* into one
    text = re.sub(r"[ \t]+", " ", text)

    # 4. Collapse multiple consecutive newlines into a single newline
    # (e.g., "\n\n\n" -> "\n")
    text = re.sub(r"\n+", "\n", text)

    # 5. Remove any leading/trailing whitespace from the whole text
    return text.strip()


def clean_llm_output(text: str) -> str:
    # if not text:
    #     return text
    # return normalize_whitespace(normalize_structure(text))
    return text


def read_dataset():
    with open(os.path.join(DATA_DIR, "dataset.json"), "r", encoding="utf-8") as f:
        dataset = json.load(f)
    return dataset


def get_pdf_text(file_path):
    reader = PdfReader(file_path)

    assert len(reader.pages) > 0, "PDF has no pages"
    assert len(reader.pages) == 1, "PDF has more than one page"

    # return normalize_structure(reader.pages[0].extract_text())
    return reader.pages[0].extract_text()

In [82]:
# # DEBUG NORMALIZATION
# goku_text = "K"
# clean_text = normalize_text(goku_text)
# print(f"'{clean_text}'")
# # Output: "SON GOKU Inscrição Seccional 101943 PR Subseção"

In [83]:
dataset = read_dataset()
for data in dataset:
    pdf_path = os.path.join(DATA_DIR, data["pdf_path"])
    pdf_text = get_pdf_text(pdf_path)
    data.update({"pdf_text": pdf_text})

In [84]:
print(dataset[0]["pdf_text"])

JOANA D'ARC
Inscrição Seccional Subseção
101943 PR CONSELHO SECCIONAL - PARANÁ
SUPLEMENTAR
Endereço Profissional
AVENIDA PAULISTA, Nº 2300 andar Pilotis, Bela Vista
SÃO PAULO - SP
01310300
Telefone Profissional
SITUAÇÃO REGULAR


In [85]:
def create_pydantic_model(schema: Dict[str, Any]) -> BaseModel:
    fields = {
        key: (str | None, Field(default=None, description=value))
        for key, value in schema.items()
    }
    model = create_model("DynamicModel", **fields)
    return model


for data in dataset:
    data.update({"pydantic_model": create_pydantic_model(data["extraction_schema"])})

In [86]:
model = init_chat_model(
    "gemini-2.5-flash",
    model_provider="google_genai",
    api_key=os.getenv("GEMINI_API_KEY"),
)

In [87]:
prompt_template_pt = """
Você receberá um trecho de texto extraído de um documento.

Com base nesse texto, extraia **exclusivamente** as informações solicitadas e retorne o resultado em **formato JSON**, seguindo rigorosamente o esquema fornecido.

Texto de entrada:
{text}

Esquema JSON esperado:
{schema}

Instruções importantes:
- Se uma informação não estiver presente no texto, atribua o valor **null** ao campo correspondente.
- Não adicione informações que não estejam explícitas no texto.
- Retorne **apenas** o JSON, sem comentários, explicações ou texto adicional.
- Garanta que o JSON seja **válido**, **bem formatado** e **compatível com o esquema**.
"""

prompt_template_en = """
You are a text extraction robot. Your task is to extract information from the `Input text` according to the `Extraction Schema`.

Return **only** a valid JSON object.

---
**Input text:**
{text}
---
**Extraction Schema:**
{schema}
---
**Example of Correct Extraction:**

* **Example Input Text Snippet:**
    "...
    Telefone Profissional
    SITUAÇÃO REGULAR
    ..."

* **Example Schema:**
    `{{"situacao": "Situação do profissional"}}`

* **Correct Output (Verbatim):**
    `{{"situacao": "SITUAÇÃO REGULAR"}}`

* **Incorrect Output (Interpreted/Simplified):**
    `{{"situacao": "REGULAR"}}`
---
**Important Instructions:**
- **CRITICAL:** You must extract the text *verbatim* (exactly as it appears), as shown in the CORRECT example. Do not summarize, interpret, simplify, or rephrase.
- If any information is missing from the text, assign the value **null** to the corresponding field.
- Ensure your output contains **only** the JSON and nothing else (no comments, no explanations).
"""

In [88]:
DATA_IDX = 0

In [89]:
# config = {"configurable": {"thread_id": str(uuid4())}}
# memory = MemorySaver()
agent = create_agent(
    # model=model, tools=[], response_format=OABSchema, checkpointer=memory
    model=model,
    tools=[],
    response_format=dataset[DATA_IDX]["pydantic_model"],
)

In [90]:
print(dataset[DATA_IDX]["pdf_text"])

JOANA D'ARC
Inscrição Seccional Subseção
101943 PR CONSELHO SECCIONAL - PARANÁ
SUPLEMENTAR
Endereço Profissional
AVENIDA PAULISTA, Nº 2300 andar Pilotis, Bela Vista
SÃO PAULO - SP
01310300
Telefone Profissional
SITUAÇÃO REGULAR


In [91]:
response = agent.invoke(
    {
        "messages": [
            {
                "role": "user",
                "content": prompt_template_en.format(
                    text=dataset[DATA_IDX]["pdf_text"],
                    schema=dataset[DATA_IDX]["pydantic_model"],
                ),
            }
        ]
    },
    # config=config,
)


In [92]:
print(response["structured_response"])

nome="JOANA D'ARC" inscricao='101943' seccional='PR' subsecao='CONSELHO SECCIONAL - PARANÁ' categoria='SUPLEMENTAR' endereco_profissional='AVENIDA PAULISTA, Nº 2300 andar Pilotis, Bela Vista\nSÃO PAULO - SP\n01310300' telefone_profissional=None situacao='SITUAÇÃO REGULAR'


In [93]:
model_dict = response["structured_response"].model_dump()
normalized_dict = {k: clean_llm_output(v) for k, v in model_dict.items()}
response["structured_response"] = dataset[DATA_IDX]["pydantic_model"](**normalized_dict)

In [94]:
print(response["structured_response"])

nome="JOANA D'ARC" inscricao='101943' seccional='PR' subsecao='CONSELHO SECCIONAL - PARANÁ' categoria='SUPLEMENTAR' endereco_profissional='AVENIDA PAULISTA, Nº 2300 andar Pilotis, Bela Vista\nSÃO PAULO - SP\n01310300' telefone_profissional=None situacao='SITUAÇÃO REGULAR'


In [95]:
# Define the exact types of rules that your system understands
RuleType = Literal["regex", "keyword", "position"]

# Define the exact strategies for rules of type "keyword"
KeywordStrategy = Literal["next_line", "multiline_until_stop", "conditional_null"]


class Rule(BaseModel):
    """Stores a single extraction rule generated by the Learning Loop."""

    # --- Main Discriminator Field ---
    type: RuleType = Field(
        ..., description="The main type of the rule (regex, keyword, or position)"
    )

    # --- 1. For type="regex" ---
    rule: Optional[str] = Field(
        None,
        description="The regex pattern to be executed. (Ex: 'Inscrição[^\d]*(\d{6})')",
    )

    # --- 2. For type="keyword" ---
    keyword: Optional[str] = Field(
        None, description="The 'anchor' keyword to search for in the text."
    )

    strategy: Optional[KeywordStrategy] = Field(
        None, description="The action to take after finding the keyword."
    )

    stop_keyword: Optional[str] = Field(
        None,
        description="Where to stop for 'multiline' or what to check for 'conditional'.",
    )

    # --- 3. For type="position" ---
    line_number: Optional[int] = Field(
        None,
        description="The line number to extract (e.g., 1 for the first line).",
    )

    # --- 4. Validation (Applies to all types) ---
    validation_regex: str = Field(
        ...,
        description="A simple regex to validate the *format* of the extracted value (e.g., '^\d{6}$').",
    )

In [96]:
rule_generation_prompt_template_en = """
You are an expert automation engineer specializing in robust text extraction.
Your task is to generate **two** mandatory items:
1.  A **single, robust extraction rule** for a specific data field.
2.  A **mandatory `validation_regex`** to verify the format of the extracted data.

The goal is to create an "atomic" rule that can find this value in future documents. The rule MUST be based on stable "anchor" keywords (like "Inscrição", "Endereço Profissional") or patterns directly related to **itself**, not based on the position of *other* fields.

**Crucial Constraint: What to AVOID**
* **DO NOT** create rules that depend on the relative position of *other* fields.
* **Bad Rule (Coupled):** "Find the text on the line after the 'inscricao' field."
* **Good Rule (Atomic):** "Find the text on the line after the keyword 'Subseção'."

---
**ANALYSIS PATHS:**

**PATH A: If `field_value` is NOT null (e.g., "JOANA D'ARC")**
1.  **Locate:** Find the `field_value` in the `full_text`.
2.  **Find Anchor:** Analyze the text *immediately* surrounding the value to find a stable, unique keyword (like "Nome", "Inscrição", etc.).
3.  **Generate Extraction Rule:** Create the best possible extraction rule (`type`, `rule`, etc.).
4.  **Generate Validation Regex:** Analyze the `field_value` and create a `validation_regex` that matches its *format*. **This is a mandatory step.**

**PATH B: If `field_value` IS null**
1.  **Locate Anchor:** Find the "anchor" keyword for the field (e.g., "Telefone Profissional") in the `full_text`.
2.  **Find Stop-Anchor:** Analyze the text *immediately following* this anchor. Find the *next* field's anchor (e.g., "SITUAÇÃO REGULAR").
3.  **Generate Extraction Rule:** Create a `conditional_null` rule.
4.  **Generate Validation Regex:** For a `null` value, the `validation_regex` **must be `null`**.

---
**INPUTS:**

**1. Full Text (`full_text`):**
{text}

**2. Field to Analyze (`field_name`):**
"{field_name}"

**3. Extracted Value (`field_value`):** (This could be `null` or `None`)
"{field_value}"

**4. Field Description (`field_description`):**
"{field_description}"

---
**OUTPUT INSTRUCTIONS:**

Return **only** a single, valid JSON object for the generated rule, strictly adhering to the following `Rule` schema.
**Both the extraction rule and the `validation_regex` are mandatory outputs** (unless `field_value` is `null`).

**Rule Schema:**
{{
    "type": "The type of rule. Use 'regex' whenever possible.",
    "rule": "The Python-compatible regex pattern. It MUST include a capture group ( ).",
    "keyword": "The 'anchor' keyword (use if 'regex' is not possible).",
    "strategy": "The strategy for the 'keyword' (e.g., 'next_line', 'multiline_until_stop', 'conditional_null').",
    "stop_keyword": "The stopping keyword for the strategy.",
    "line_number": "The line number (use 'position' only as a last resort).",
    "validation_regex": "MANDATORY. A regex to validate the *format* of the extracted value (e.g., '^\d{{6}}$'). Must be `null` if and only if `field_value` is `null`."
}}

**Example for a `regex` rule (PATH A):**
{{
    "type": "regex",
    "rule": "Inscrição[^\d]*(\d{{6}})",
    "keyword": null,
    "strategy": null,
    "stop_keyword": null,
    "line_number": null,
    "validation_regex": "^\d{{6}}$"
}}

**Example for a `keyword` rule (PATH A):**
{{
    "type": "keyword",
    "rule": null,
    "keyword": "Subseção",
    "strategy": "next_line",
    "stop_keyword": null,
    "line_number": null,
    "validation_regex": "^[A-Z\s-]+$"
}}

**Example for a `conditional_null` rule (PATH B):**
{{
    "type": "keyword",
    "rule": null,
    "keyword": "Telefone Profissional",
    "strategy": "conditional_null",
    "stop_keyword": "SITUAÇÃO",
    "line_number": null,
    "validation_regex": null
}}

**Your Turn:**
Generate the rule for the field `"{field_name}"`.
"""

In [97]:
# config = {"configurable": {"thread_id": str(uuid4())}}
# memory = MemorySaver()
agent_rule = create_agent(
    # model=model, tools=[], response_format=Rule, checkpointer=memory
    model=model,
    tools=[],
    response_format=Rule,
)

In [98]:
def create_rule_generation_prompt(
    text: str, field_name: str, field_value: str, field_description: str
) -> str:
    return rule_generation_prompt_template_en.format(
        text=text,
        field_name=field_name,
        field_value=field_value,
        field_description=field_description,
    )

In [99]:
def _execute_position_rule(rule: Rule, text: str) -> Optional[str]:
    """Applies a 'position' type rule."""
    if not rule.line_number or rule.line_number <= 0:
        return None  # Invalid line number

    lines = text.splitlines()

    # -1 because line_number is 1-based, list is 0-based
    line_index = rule.line_number - 1

    if line_index < len(lines):
        return lines[line_index].strip()

    return None  # Line number out of range


def _execute_keyword_rule(rule: Rule, text: str) -> Optional[str]:
    """Applies a 'keyword' type rule."""
    if not rule.keyword:
        return None

    # Find the keyword
    keyword_pos = text.find(rule.keyword)
    if keyword_pos == -1:
        return None  # Keyword not found

    # Get all text *after* the keyword
    start_pos = keyword_pos + len(rule.keyword)
    text_after = text[start_pos:]

    # --- Apply Strategy ---

    if rule.strategy == "next_line":
        # Finds the first non-empty line after the keyword
        for line in text_after.splitlines():
            stripped_line = line.strip()
            if stripped_line:
                return stripped_line
        return None  # No non-empty line found

    elif rule.strategy == "multiline_until_stop":
        if not rule.stop_keyword:
            return text_after.strip()  # No stop, return all text after

        # Find the stop_keyword *in the text after the keyword*
        stop_pos = text_after.find(rule.stop_keyword)

        if stop_pos != -1:
            # Return everything between keyword and stop_keyword
            return text_after[:stop_pos].strip()
        else:
            # Stop keyword wasn't found, so this rule fails
            return None

    elif rule.strategy == "conditional_null":
        # This rule's job is to confirm a value is null.
        # It succeeds *if* the text between keyword and stop_keyword is empty.
        if not rule.stop_keyword:
            return None  # This strategy requires a stop_keyword

        # Check if the text immediately after the keyword (when stripped)
        # starts with the stop_keyword.
        if text_after.strip().startswith(rule.stop_keyword):
            return None  # Success: The value is correctly identified as null.
        else:
            # Failure: There is text between the keywords.
            # This rule's logic does not match.
            return None

    return None


# (Assumes your Rule, RuleType, and KeywordStrategy models are defined)
def _execute_regex_rule(rule: Rule, text: str) -> Optional[str]:
    """Applies a 'regex' type rule."""
    if not rule.rule:
        return None

    # re.DOTALL makes the '.' special character match any character,
    # including a newline. This is crucial for multi-line fields.
    match = re.search(rule.rule, text, re.DOTALL)

    if match:
        try:
            # .group(1) extracts the text from the *first capture group*
            # This is the standard for extraction regex.
            return match.group(1).strip()
        except IndexError:
            # This happens if the regex matched but had NO capture group.
            # We can fall back to group(0), the full match.
            return match.group(0).strip()

    return None  # No match found


def execute_rule(rule: Rule, text: str) -> Optional[str]:
    """
    Executes a given extraction rule on the input text.

    This function acts as a dispatcher, calling the correct
    sub-function based on the rule's 'type'.
    """
    try:
        if rule.type == "regex":
            return _execute_regex_rule(rule, text)
        elif rule.type == "keyword":
            return _execute_keyword_rule(rule, text)
        elif rule.type == "position":
            return _execute_position_rule(rule, text)
    except Exception as e:
        # Catch any unexpected errors during rule execution
        print(f"Error executing rule (type: {rule.type}, rule: {rule.rule}): {e}")
        return None

    return None

In [100]:
# DEBUG CELL

# pdf_text = dataset[DATA_IDX]["pdf_text"]
# obj_schema = dataset[DATA_IDX]["extraction_schema"]
# llm_response = response["structured_response"].model_dump()

# base_prompt = create_rule_generation_prompt(
#     pdf_text, "nome", llm_response["nome"], dataset[DATA_IDX]["extraction_schema"]["nome"]
# )
# response_rule = agent_rule.invoke(
#     {"messages": [{"role": "user", "content": base_prompt}]}
# )
# execute_rule(response_rule["structured_response"], pdf_text)

In [101]:
def _validate_syntax(
    response: Dict[str, Any],
    field_name: str,
    attempt: int,
    logger: logging.Logger,
) -> Tuple[Optional[Rule], Optional[str]]:
    stage = "syntax_validation"
    try:
        # 4. Validate Syntax (Schema)
        rule = Rule.model_validate(response.get("structured_response", {}))
        extra = (
            {
                "field": field_name,
                "attempt": attempt,
                "stage": stage,
            },
        )
        logger.debug(
            f"Syntax validation successful: {extra}",
        )
        return rule, None
    except Exception as e:
        feedback = (
            f"- ATTEMPT {attempt} FAILED: The JSON was malformed or failed schema validation. "
            f"Error: {e}. Make sure to return ONLY valid JSON matching the schema."
        )
        extra = (
            {
                "field": field_name,
                "attempt": attempt,
                "stage": stage,
                "error": str(e),
                "raw_response": response,
            },
        )
        logger.warning(
            f"Syntax validation failed: {extra}",
        )
        return None, feedback


def _validate_extraction_rule(
    rule: Rule,
    text: str,
    field_value: str,
    field_name: str,
    attempt: int,
    logger: logging.Logger,
) -> Optional[str]:
    stage = "extraction_validation"
    extracted_val = None
    try:
        # 5. Validate Rule Execution (the 'rule')
        extracted_val = execute_rule(rule, text)  # Your re.search() function

        if extracted_val == field_value:
            extra = (
                {
                    "field": field_name,
                    "attempt": attempt,
                    "stage": stage,
                    "rule": rule.rule,
                    "extracted": extracted_val,
                },
            )
            logger.debug(
                f"Extraction rule validation successful: {extra}",
            )
            return None  # Success

        # Mismatch failure
        feedback = (
            f"- ATTEMPT {attempt} FAILED: The 'rule' was wrong. "
            f"Rule: `{rule.rule}`\n"
            f"  - Extracted: `{extracted_val}`\n"
            f"  - Expected: `{field_value}`\n"
            f"Please create a more precise regex."
        )
        extra = (
            {
                "field": field_name,
                "attempt": attempt,
                "stage": stage,
                "rule": rule.rule,
                "extracted": extracted_val,
                "expected": field_value,
            },
        )
        logger.warning(
            f"Extraction rule mismatch: {extra}",
        )
        return feedback

    except Exception as e:
        # Execution error
        feedback = (
            f"- ATTEMPT {attempt} FAILED: Error executing 'rule' regex. "
            f"Rule: `{rule.rule}`. Error: {e}"
        )
        extra = (
            {
                "field": field_name,
                "attempt": attempt,
                "stage": stage,
                "rule": rule.rule,
                "error": str(e),
            },
        )
        logger.warning(
            f"Extraction rule execution error: {extra}",
        )
        return feedback


def _validate_validation_regex(
    rule: Rule,
    field_value: str,
    field_name: str,
    attempt: int,
    logger: logging.Logger,
) -> Optional[str]:
    stage = "validation_regex_validation"
    try:
        # 6. Validate Validation Execution (the 'validation_regex')
        if re.match(rule.validation_regex, field_value):
            extra = (
                {
                    "field": field_name,
                    "attempt": attempt,
                    "stage": stage,
                    "validation_regex": rule.validation_regex,
                    "value_matched": field_value,
                },
            )
            logger.debug(
                f"Validation regex validation successful: {extra}",
            )
            return None  # Success

        # Mismatch failure
        feedback = (
            f"- ATTEMPT {attempt} FAILED: The 'validation_regex' was wrong.\n"
            f"  - Regex: `{rule.validation_regex}`\n"
            f"  - Did not match the expected value: `{field_value}`\n"
            f"  - Please create a 'validation_regex' that fully matches the expected value."
        )
        extra = (
            {
                "field": field_name,
                "attempt": attempt,
                "stage": stage,
                "validation_regex": rule.validation_regex,
                "expected_to_match": field_value,
            },
        )
        logger.warning(
            f"Validation regex mismatch: {extra}",
        )
        return feedback

    except Exception as e:
        # Execution error
        feedback = (
            f"- ATTEMPT {attempt} FAILED: Error in 'validation_regex'. "
            f"Regex: `{rule.validation_regex}`. Error: {e}"
        )
        extra = (
            {
                "field": field_name,
                "attempt": attempt,
                "stage": stage,
                "validation_regex": rule.validation_regex,
                "error": str(e),
            },
        )
        logger.warning(
            f"Validation regex execution error: {extra}",
        )
        return feedback


def generate_robust_rule(
    text: str,
    field_name: str,
    field_value: str,
    field_description: str,
    max_attempts: int = 3,
) -> Optional[Rule]:
    extra = (
        {
            "field": field_name,
            "field_value": field_value,
        },
    )
    logger.info(
        f"Starting robust rule generation for field '{field_name}' with value '{field_value}': {extra}"
    )

    # 1. Prepare the base prompt
    base_prompt = create_rule_generation_prompt(
        text, field_name, field_value, field_description
    )
    feedback_history: List[str] = []  # Stores feedback from failures

    for attempt in range(max_attempts):
        current_attempt = attempt + 1
        extra = (
            {
                "field": field_name,
                "current_attempt": current_attempt,
                "max_attempts": max_attempts,
            },
        )
        logger.info(
            f"Starting attempt {current_attempt}/{max_attempts} for field '{field_name}': {extra}"
        )

        # 2. Build the prompt with feedback (if any)
        current_prompt = base_prompt
        if feedback_history:
            feedback_str = "\n".join(feedback_history)
            current_prompt += f"""
            ---
            You have tried before. Analyze the feedback and generate a new rule.

            FEEDBACK FROM PREVIOUS ATTEMPTS:
            {feedback_str}

            Generate a new and CORRECTED rule JSON:
            """
            extra = (
                {
                    "current_attempt": current_attempt,
                    "feedback_count": len(feedback_history),
                },
            )
            logger.debug(
                f"Feedback added to prompt for attempt {current_attempt}: {extra}"
            )

        # 3. Invoke Agent (LLM)
        extra = (
            {
                "current_attempt": current_attempt,
            },
        )
        logger.info(f"Invoking agent for attempt {current_attempt}: {extra}")
        response = agent_rule.invoke(
            {"messages": [{"role": "user", "content": current_prompt}]}
        )

        # 4. Validate Syntax
        rule, feedback = _validate_syntax(response, field_name, current_attempt, logger)
        if feedback:
            feedback_history.append(feedback)
            continue  # Try again

        # 5. Validate Rule Execution
        feedback = _validate_extraction_rule(
            rule, text, field_value, field_name, current_attempt, logger
        )
        if feedback:
            feedback_history.append(feedback)
            continue  # Try again

        # 6. Validate Validation Regex
        feedback = _validate_validation_regex(
            rule, field_value, field_name, current_attempt, logger
        )
        if feedback:
            feedback_history.append(feedback)
            continue  # Try again

        # 7. Success!
        # All three validations (Syntax, Rule, Validation) passed.
        extra = (
            {
                "field": field_name,
                "final_attempt": current_attempt,
                "final_feedback_history": "\n".join(feedback_history),
            },
        )
        logger.info(
            f"Rule generation successful for field '{field_name}' after {current_attempt} attempts: {extra}",
        )
        return rule

    # 8. Failure (after max_attempts)
    extra = (
        {
            "field": field_name,
            "attempts": max_attempts,
            "final_feedback_history": "\n".join(feedback_history),
        },
    )
    logger.error(
        f"Failed to generate a valid rule for field '{field_name}' after {max_attempts} attempts: {extra}",
    )
    print(f"ALERT: Failed to generate a valid rule for field '{field_name}'.")
    return None


In [102]:
# def generate_robust_rule(
#     text, field_name, field_value, field_description, max_attempts=3
# ) -> Optional[Rule]:
#     logger.debug(
#         f"Starting rule generation for field '{field_name}' with value '{field_value}'"
#     )

#     # 1. Prepare the prompt
#     #    (This is the prompt that requests the complete JSON,
#     #     including 'rule' AND 'validation_regex')
#     base_prompt = create_rule_generation_prompt(
#         text, field_name, field_value, field_description
#     )
#     logger.debug(f"Base prompt prepared for field '{field_name}'")

#     feedback_history = []  # Stores feedback from failures

#     for attempt in range(max_attempts):
#         logger.debug(f"Attempt {attempt + 1} for field '{field_name}'")

#         # 2. Build the prompt with feedback (if any)
#         current_prompt = base_prompt
#         if feedback_history:
#             feedback_str = "\n".join(feedback_history)
#             current_prompt += f"""
#             ---
#             You have tried before. Analyze the feedback and generate a new rule.

#             FEEDBACK FROM PREVIOUS ATTEMPTS:
#             {feedback_str}

#             Generate a new and CORRECTED rule JSON:
#             """
#             logger.debug(f"Feedback added to prompt for attempt {attempt + 1}")

#         # 3. Invoke Agent (LLM2)
#         logger.debug(f"Invoking agent for attempt {attempt + 1}")
#         response = agent_rule.invoke(
#             {"messages": [{"role": "user", "content": current_prompt}]}
#         )

#         # 4. Validate Syntax (Schema)
#         try:
#             rule = Rule.model_validate(response["structured_response"])
#             logger.debug(f"Rule validation successful for attempt {attempt + 1}")
#         except Exception as e:
#             feedback_history.append(
#                 f"- ATTEMPT {attempt + 1} FAILED: The JSON was malformed. "
#                 f"Error: {e}. Make sure to return ONLY JSON."
#             )
#             logger.warning(f"Rule validation failed for attempt {attempt + 1}: {e}")
#             continue  # Try again

#         # 5. Validate Rule Execution (the 'rule')
#         try:
#             extracted_val = execute_rule(rule, text)  # Your re.search() function
#             logger.debug(
#                 f"Extracted value for attempt {attempt + 1}: '{extracted_val}'"
#             )
#         except Exception as e:
#             feedback_history.append(
#                 f"- ATTEMPT {attempt + 1} FAILED: Error executing rule. Error: {e}"
#             )
#             logger.warning(f"Rule execution error for attempt {attempt + 1}: {e}")
#             continue  # Try again

#         if extracted_val != field_value:
#             feedback_history.append(
#                 f"- ATTEMPT {attempt + 1} FAILED: The 'rule' was wrong. "
#                 f"Rule: `{rule.rule}`\n"
#                 f"   - Extracted: `{extracted_val}`\n"
#                 f"   - Expected: `{field_value}`\n"
#                 f"Please create a more precise regex."
#             )
#             logger.warning(
#                 f"Rule execution failed for attempt {attempt + 1}: extracted '{extracted_val}' != expected '{field_value}'"
#             )
#             continue  # Try again

#         # 6. Validate Validation Execution (the 'validation_regex')
#         try:
#             if not re.match(rule.validation_regex, field_value):
#                 feedback_history.append(
#                     f"- ATTEMPT {attempt + 1} FAILED: The 'validation_regex' was wrong."
#                     f"Regex: `{rule.validation_regex}`\n"
#                     f"   - Did not match the expected value: `{field_value}`\n"
#                     f"Please create a 'validation_regex' that matches."
#                 )
#                 logger.warning(
#                     f"Validation regex failed for attempt {attempt + 1}: regex '{rule.validation_regex}' did not match '{field_value}'"
#                 )
#                 continue  # Try again
#         except Exception as e:
#             feedback_history.append(
#                 f"- ATTEMPT {attempt + 1} FAILED: Error in validation regex. Error: {e}"
#             )
#             logger.warning(f"Validation regex error for attempt {attempt + 1}: {e}")
#             continue  # Try again

#         # 7. Success!
#         # All three validations (Syntax, Rule, Validation) passed.
#         logger.debug(
#             f"Rule generation successful for field '{field_name}' after {attempt + 1} attempts"
#         )
#         return rule

#     # 8. Failure (after max_attempts)
#     logger.error(
#         f"Failed to generate a valid rule for field '{field_name}' after {max_attempts} attempts"
#     )
#     print(f"ALERT: Failed to generate a valid rule for field '{field_name}'.")
#     return None

In [103]:
pdf_text = dataset[DATA_IDX]["pdf_text"]
obj_schema = dataset[DATA_IDX]["extraction_schema"]
llm_response = response["structured_response"].model_dump()

generated_rules = {}

for field, value in llm_response.items():
    print("Generating rule for field:", field, "with value:", value)
    if value is not None:
        # Preparar os inputs para o NOVO prompt
        field_name = field
        field_value = value
        field_description = dataset[DATA_IDX]["extraction_schema"][field]

        # Criar o prompt de geração de regra
        rule_prompt = rule_generation_prompt_template_en.format(
            text=pdf_text,
            field_name=field_name,
            field_value=field_value,
            field_description=field_description,
        )

        rule_object = generate_robust_rule(
            pdf_text, field_name, field_value, field_description, max_attempts=5
        )

        generated_rules[field] = rule_object
        print("Generated rule for field", field, ":", generated_rules[field])
    break

# # Agora você tem um "Heuristic Store" de altíssima qualidade
# save_rules_to_store("carteira_oab", generated_rules)

INFO:__main__:Starting robust rule generation for field 'nome' with value 'JOANA D'ARC': ({'field': 'nome', 'field_value': "JOANA D'ARC"},)
INFO:__main__:Starting attempt 1/5 for field 'nome': ({'field': 'nome', 'current_attempt': 1, 'max_attempts': 5},)
INFO:__main__:Invoking agent for attempt 1: ({'current_attempt': 1},)


Generating rule for field: nome with value: JOANA D'ARC


DEBUG:__main__:Syntax validation successful: ({'field': 'nome', 'attempt': 1, 'stage': 'syntax_validation'},)
INFO:__main__:Starting attempt 2/5 for field 'nome': ({'field': 'nome', 'current_attempt': 2, 'max_attempts': 5},)
DEBUG:__main__:Feedback added to prompt for attempt 2: ({'current_attempt': 2, 'feedback_count': 1},)
INFO:__main__:Invoking agent for attempt 2: ({'current_attempt': 2},)
DEBUG:__main__:Syntax validation successful: ({'field': 'nome', 'attempt': 2, 'stage': 'syntax_validation'},)
DEBUG:__main__:Extraction rule validation successful: ({'field': 'nome', 'attempt': 2, 'stage': 'extraction_validation', 'rule': "^([A-Z ']+)", 'extracted': "JOANA D'ARC"},)
DEBUG:__main__:Validation regex validation successful: ({'field': 'nome', 'attempt': 2, 'stage': 'validation_regex_validation', 'validation_regex': "^[A-Z ']+$", 'value_matched': "JOANA D'ARC"},)
INFO:__main__:Rule generation successful for field 'nome' after 2 attempts: ({'field': 'nome', 'final_attempt': 2, 'final_f

Generated rule for field nome : type='regex' rule="^([A-Z ']+)" keyword=None strategy=None stop_keyword=None line_number=None validation_regex="^[A-Z ']+$"


In [104]:
generated_rules

{'nome': Rule(type='regex', rule="^([A-Z ']+)", keyword=None, strategy=None, stop_keyword=None, line_number=None, validation_regex="^[A-Z ']+$")}

In [105]:
def apply_rules_to_text(rules: Dict[str, Any], text: str) -> Dict[str, str | None]:
    results = {}

    for field_name, rule in rules.items():
        value = None
        if rule is not None:
            value = execute_rule(rule, text)
        value = clean_llm_output(value) if value is not None else None
        results[field_name] = value

    return results


extracted_values = apply_rules_to_text(generated_rules, pdf_text)
pprint(extracted_values)

{'nome': "JOANA D'ARC"}
