In [None]:
%pip install -qU langchain_community langchain_core easyocr pdf2image

In [2]:
# GENERAL IMPORTS
import re
from langchain_core.documents import Document
from pathlib import Path
from rich import print
from tqdm import tqdm
from typing import List, Dict

# RICH'S PRINT COLORS
YELLOW = "#fde047"
ORANGE = "#f97316"
RED = "#ef4444"
BLUE = "#3b82f6"
CYAN = "#06b6d4"
EMERALD = "#34d399"
VIOLET = "#a855f7"
PINK = "#ec4899"
GRAY = "#64748b"
WHITE = "#cccccc"
GREEN = "#3fb618"

# PATHS
ROOT_DIR = Path("../../../../COLEGA DATA")
PDF_DIR = ROOT_DIR / "notificaciones"
PDF_DIR_2 = ROOT_DIR / "MÉTODO DE LA DEMANDA Y SU CONTESTACIÓN" / "CAPS"
PDF_FILE_1 = PDF_DIR / "RES 04-04-2024 - DILIGENCIA PRELIMINAR.pdf"
PDF_FILE_2 = PDF_DIR_2 / "1_EL_CASO_Y_SU_SOLUCIÓN.pdf"

In [10]:
def search_dir(dir_path: str, file_ext: str) -> List[Dict[str, str]]:
    """FILE'S SEARCH IN A GIVEN DIRECTORY"""
    dir_path = Path(dir_path)

    if not dir_path.is_dir():
        raise ValueError(f"search_dir() => DIRECTORY ({dir_path}) DOESN'T EXIST.")

    if not any(dir_path.iterdir()):
        raise ValueError(f"search_dir() => DIRECTORY ({dir_path}) IS EMPTY.")

    if not file_ext.startswith("."):
        file_ext = f".{file_ext}"

    # SEARCH FOR WANTED FILES
    files_info: List[Dict[str, str]] = [
        {"filename": f.name, "filepath": str(f)}
        for f in dir_path.glob(f"*{file_ext}")
        if f.is_file()
    ]

    # CHECK IF FILES WERE FOUND
    if not files_info:
        raise ValueError(
            f"search_dir() => NO FILES WITH EXTENSION ({file_ext}) WERE FOUND IN DIRECTORY ({dir_path})."
        )

    return files_info


def text_cleaner(text: str) -> str:
    """
    Cleans text by replacing non-breaking spaces, normalizing spaces and newlines,
    and removing hash symbols.
    """
    try:
        # From non-breaking space character to a regular space
        text = re.sub(r"\xa0", " ", text)
        # From multiple spaces to a single space
        text = re.sub(r" +", " ", text)
        # From >=3 - symbols to none
        text = re.sub(r"-{3,}", "", text)
        # From >=3 line breaks to double line breaks
        text = re.sub(r"\n{3,}", "\n\n", text)
        # From >=2  hash symbols to none
        text = re.sub(r"#{2,}", "", text)
        # Trim leading and trailing whitespace
        text = "\n\n".join([line.strip() for line in text.split("\n\n")])
        # text = "\n".join([line.strip() for line in text.split("\n")])
        text = text.strip()

    except Exception as e:
        print(f"An error occurred while cleaning the text: {e}")

    return text


def is_text_corrupt(text) -> bool:
    """Verifica si el texto extraído contiene caracteres corruptos o codificado incorrectamente."""
    if not text.strip():
        return True

    # Contar caracteres alfabéticos, espacios y caracteres extraños
    total_chars = len(text)
    valid_chars = sum(c.isalpha() or c.isspace() for c in text)
    invalid_chars = sum(1 for c in text if c in "�")  # Caracteres de reemplazo o BOM

    # Si hay demasiados caracteres extraños o pocos alfabéticos, marcar como corrupto
    if (valid_chars / total_chars) < 0.7:
        # if (invalid_chars / total_chars) > 0.3:
        return True

    return False

In [6]:
import easyocr
import numpy as np
from pdf2image import convert_from_path
from PIL import Image

In [7]:
def directory_loader(dir_path: str, file_ext: str) -> List[Document]:
    """LOADS PDF DOCUMENTS FROM A GIVEN DIRECTORY WITH PROGRESS INDICATOR."""

    files_info: List[Dict[str, str]] = search_dir(dir_path, file_ext)

    # Initialize EasyOCR reader for Spanish and English
    reader = easyocr.Reader(["es", "en"])

    loaded_docs: List[Document] = []
    for f in tqdm(
        files_info,
        desc="LOADING PDF FILES",
        total=len(files_info),
        colour=EMERALD,
    ):
        f_pages_imgs = convert_from_path(f["filepath"])

        pages: List[Document] = []
        for page in f_pages_imgs:
            # EasyOCR reads the text
            results = reader.readtext(np.array(page))
            # Extract text from results
            page_extracted_text = " ".join([text[1] for text in results])

            clean_text = text_cleaner(page_extracted_text)

            pages.append(Document(metadata=f, page_content=clean_text))

        loaded_docs.append(pages)

    return loaded_docs

In [None]:
easy_docs = directory_loader(PDF_DIR, "pdf")

In [None]:
for index, doc in enumerate(easy_docs):
    for page in doc:
        if is_text_corrupt(page.page_content):
            print(f"[{RED}]{page.metadata['filename']}[/]")
        else:
            print(f"[{GREEN}]{page.metadata['filename']}[/]")

for index, doc in enumerate(easy_docs):
    for page in doc:
        print(
            f"[bold {BLUE}]> DOC N°:[/] [bold {WHITE}]{index}[/]\n",
            f"[bold {EMERALD}]> FILENAME:[/] [bold {WHITE}]{page.metadata["filename"]}[/]\n\n",
            f"[bold {YELLOW}]> CONTENT:[/]\n[{WHITE}]{page.page_content}[/]",
        )

---

    TESTING WITH BROKEN PDFS

In [13]:
reader = easyocr.Reader(["es", "en"])

pages_imgs: List[Image.Image] = convert_from_path(PDF_FILE_2)

loaded_file: List[Document] = []
for index, page in enumerate(pages_imgs):
    # EasyOCR reads the text
    results = reader.readtext(np.array(page))
    # Extract text from results
    page_extracted_text = " ".join([text[1] for text in results])

    clean_text = text_cleaner(page_extracted_text)

    page_doc = Document(
        metadata={"filepath": str(PDF_FILE_2), "page": index + 1},
        page_content=clean_text,
    )
    loaded_file.append(page_doc)

In [None]:
for page in loaded_file:
    print(
        f"[bold {EMERALD}]> METADATA:[/] [bold {WHITE}]{page.metadata["filepath"].split("/")[-1]}[/]\n\n",
        f"[bold {YELLOW}]> CONTENT:[/]\n[{WHITE}]{page.page_content}[/]",
    )