In [1]:
pip install PyMuPDF

Note: you may need to restart the kernel to use updated packages.


In [None]:
import os
import fitz #PyMuPDF

## Libraries

In [1]:
import dask.dataframe as dd
import glob
import pandas as pd
from google.api_core.client_options import ClientOptions
from google.cloud import documentai
import time
from tqdm import tqdm
import os

## OCR google

In [2]:
LOCATION = 'us'
PROJECT_ID = 'poc-accionclimatica-agrilac'
PROCESSOR_ID = '9cbdccf92c290be'
FILE_PATH = '/home/jupyter/data/forrajes/*'
MIME_TYPE = "application/pdf"

In [3]:
list_blobs = glob.glob(FILE_PATH)

In [4]:
files_df = pd.DataFrame(list_blobs, columns=['file_path'])

In [9]:
files_dd = dd.from_pandas(files_df,npartitions = 20) 

In [14]:
# The full resource name of the processor, e.g.:
# projects/project-id/locations/location/processor/processor-id
# You must create new processors in the Cloud Console first
procesados = []
con_errores = []
def get_text_from_pdf(file_path:str):
    try:
        docai_client = documentai.DocumentProcessorServiceClient(
        client_options=ClientOptions(api_endpoint=f"{LOCATION}-documentai.googleapis.com"))
        RESOURCE_NAME = docai_client.processor_path(PROJECT_ID, LOCATION, PROCESSOR_ID)
        # Read the file into memory
        with open(file_path, "rb") as image:
            image_content = image.read()
        raw_document = documentai.RawDocument(content=image_content, mime_type=MIME_TYPE)
        request = documentai.ProcessRequest(name=RESOURCE_NAME, raw_document=raw_document)
        result = docai_client.process_document(request=request)
        document_object = result.document
        procesados.append(file_path)
        return document_object.text
    except:
        con_errores.append(file_path)
        return "NA"

In [7]:
files_df.head(1)

Unnamed: 0,file_path
0,/home/jupyter/data/forrajes/7257.pdf


In [8]:
# New function whitout dask

def process_pdf_folder(df: pd.DataFrame):
    file_paths = df['file_path'].tolist()
    responses = []
    for file in tqdm(file_paths, desc="Procesando PDFs", unit="archivo"):
        try:
            response_text = get_text_from_pdf(file)
        except Exception as e:
            print(f"Error procesando el archivo {file_path}: {e}")
            response_text = None
        responses.append(response_text)
        #time.sleep(0.5)
    df['response'] = responses
    df['id'] = df['file_path'].apply(lambda x: (x.split('/')[-1]).split('.')[0])
    return df

In [9]:
df_results = process_pdf_folder(files_df)

Procesando PDFs:   1%|          | 77/6513 [02:32<3:32:46,  1.98s/archivo]

KeyboardInterrupt



## Upload files to storage

In [8]:
from google.cloud import storage

storage_client = storage.Client()

In [None]:
def upload_files_to_gcs(bucket_name: str, source_folder: str, destination_folder: str):
    """Sube archivos desde una carpeta local a un bucket de GCS."""
    bucket = storage_client.bucket(bucket_name)
    file_list = [f for f in os.listdir(source_folder) if f.endswith('.pdf')]
    
    # Subir cada archivo al bucket de GCS
    for file_name in tqdm(file_list, desc="Subiendo archivos", unit="archivo"):
        source_file_path = os.path.join(source_folder, file_name)
        destination_blob_name = f"{destination_folder}/{file_name}"
        
        # Crear un blob en GCS y subir el archivo
        blob = bucket.blob(destination_blob_name)
        blob.upload_from_filename(source_file_path)
        
        print(f"Archivo {file_name} subido a {destination_blob_name} en el bucket {bucket_name}.")

In [None]:
BUCKET_NAME = "genebanks"
SOURCE_FOLDER = "/home/jupyter/data/forrajes"
DESTINATION_FOLDER = "zone=landing/forages-ciat/curacion-forrajes"

#upload_files_to_gcs(BUCKET_NAME, SOURCE_FOLDER, DESTINATION_FOLDER)

## OCR from Storage

In [None]:
files_df.head()

In [26]:
files_df['pdf_name'] = files_df['file_path'].apply(lambda x: (x.split('/')[-1]))

In [47]:
def get_text_from_gcs(gcs_uri: str):
    """Procesa el archivo PDF en GCS utilizando su URI y devuelve el texto extraído."""

    # Initialize the DocumentProcessorServiceClient
    client = documentai.DocumentProcessorServiceClient(
        client_options=ClientOptions(api_endpoint=f"{LOCATION}-documentai.googleapis.com")
    )

    # Construct the resource name
    resource_name = client.processor_path(PROJECT_ID, LOCATION, PROCESSOR_ID)

    # Specify GCS source using GcsSource
    gcs_source = documentai.GcsSource(uri=gcs_uri)

    # Create the input configuration for GCS
    input_config = documentai.DocumentInputConfig(gcs_source=gcs_source, mime_type="application/pdf")

    # Create the request with the input configuration
    request = documentai.ProcessRequest(name=resource_name, input_config=input_config)

    # Process the document with Document AI
    result = client.process_document(request=request)

    # Extract the text
    document = result.document
    return document.text

In [48]:
def process_pdf_folder(df: pd.DataFrame, bucket_name: str):
    """Procesa archivos PDF almacenados en GCS y aplica OCR directamente desde el bucket."""
    df['pdf_name'] = df['file_path'].apply(lambda x: (x.split('/')[-1]))
    file_paths = df['pdf_name'].tolist()
    responses = []
    error_files = []

    for file in tqdm(file_paths, desc="Procesando PDFs", unit="archivo"):
        try:
            gcs_uri = f"gs://{bucket_name}/{file}"
            response_text = get_text_from_gcs(gcs_uri)
        except Exception as e:
            print(f"Error procesando el archivo {file}: {e}")
            response_text = None
            error_files.append(file)
        responses.append(response_text)

        time.sleep(0.5)

    df['response'] = responses
    df['id'] = df['pdf_name'].apply(lambda x: (x.split('.')[0]))

    # Mostrar archivos que generaron error
    if error_files:
        print(f"Archivos que generaron error: {len(error_files)}")
        for err_file in error_files:
            print(err_file)
    
    return df, error_files

In [49]:
bucket_name = 'genebanks/zone=landing/forages-ciat/curacion-forrajes'

df_results, errors = process_pdf_folder(files_df, bucket_name)

Procesando PDFs:   0%|          | 0/6513 [00:00<?, ?archivo/s]

Error procesando el archivo 7257.pdf: module 'google.cloud.documentai' has no attribute 'GcsSource'


Procesando PDFs:   0%|          | 1/6513 [00:00<55:18,  1.96archivo/s]

Error procesando el archivo 25036.pdf: module 'google.cloud.documentai' has no attribute 'GcsSource'


Procesando PDFs:   0%|          | 2/6513 [00:01<55:14,  1.96archivo/s]

Error procesando el archivo 10352.pdf: module 'google.cloud.documentai' has no attribute 'GcsSource'


Procesando PDFs:   0%|          | 3/6513 [00:01<55:15,  1.96archivo/s]

Error procesando el archivo 18977.pdf: module 'google.cloud.documentai' has no attribute 'GcsSource'


Procesando PDFs:   0%|          | 4/6513 [00:02<55:16,  1.96archivo/s]

Error procesando el archivo 19302.pdf: module 'google.cloud.documentai' has no attribute 'GcsSource'


Procesando PDFs:   0%|          | 5/6513 [00:02<59:42,  1.82archivo/s]

Error procesando el archivo 5579.pdf: module 'google.cloud.documentai' has no attribute 'GcsSource'





KeyboardInterrupt: 

In [23]:
df_results.head(1)

NameError: name 'df_results' is not defined

In [None]:
# CSV
files_df.to_csv(process_pdf_folder, index=False, encoding='utf-8')

## Dask

In [15]:
import dask.array as da
from dask.diagnostics import ProgressBar
_col = {'response': object, 'name': object}
with ProgressBar():
    files_dd['response'] = files_dd.map_partitions(
        lambda df: df['file_path'].apply(lambda x: get_text_from_pdf(x)),
        meta=_col).compute(scheduler='processes')

[########################################] | 100% Completed | 65m 21s


In [16]:
print("finalizo")

finalizo


In [17]:
files_df = files_dd.compute()
files_df['id'] = files_df.file_path.apply(lambda x: (x.split('/')[-1]).split('.')[0])

In [18]:
files_df

Unnamed: 0,file_path,response,id
0,/home/jupyter/data/forrajes/7257.pdf,CAT Especie: Programa de Pastos Tropicales TAR...,7257
1,/home/jupyter/data/forrajes/25036.pdf,CAL Especie: Programa de Pastos Tropicales TAR...,25036
2,/home/jupyter/data/forrajes/10352.pdf,10352 CIAT No. 5 B Tipo de planta: B 1 - Postr...,10352
3,/home/jupyter/data/forrajes/18977.pdf,CC LAT Especie: 1. Colector(es): 2. Donante Pr...,18977
4,/home/jupyter/data/forrajes/19302.pdf,CMAT Programa de Pastos Tropicales TARJETA DE ...,19302
...,...,...,...
6508,/home/jupyter/data/forrajes/11525.pdf,11525 CIAT No. Б 6 CMATT Programa de Pastos Tr...,11525
6509,/home/jupyter/data/forrajes/25048.pdf,CCMAL Especie: Programa de Pastos Tropicales T...,25048
6510,/home/jupyter/data/forrajes/15450.pdf,CMATT Programa de Pastos Tropicales TARJETA DE...,15450
6511,/home/jupyter/data/forrajes/18990.pdf,CCMAT Especie: Programa de Pastos Tropicales T...,18990


In [19]:
files_df

Unnamed: 0,file_path,id
0,/home/jupyter/data/forrajes/7257.pdf,7257
1,/home/jupyter/data/forrajes/25036.pdf,25036
2,/home/jupyter/data/forrajes/10352.pdf,10352
3,/home/jupyter/data/forrajes/18977.pdf,18977
4,/home/jupyter/data/forrajes/19302.pdf,19302
...,...,...
6508,/home/jupyter/data/forrajes/11525.pdf,11525
6509,/home/jupyter/data/forrajes/25048.pdf,25048
6510,/home/jupyter/data/forrajes/15450.pdf,15450
6511,/home/jupyter/data/forrajes/18990.pdf,18990


In [None]:
# Parquet
files_df.to_parquet("/home/jupyter/data/processed_pdfs.parquet", engine='pyarrow')

In [None]:
# CSV
files_df.to_csv("/home/jupyter/data/processed_pdfs.csv", index=False, encoding='utf-8')

## Manual test

In [None]:
RESOURCE_NAME = docai_client.processor_path(PROJECT_ID, LOCATION, PROCESSOR_ID)

In [None]:
# Read the file into memory
with open(FILE_PATH, "rb") as image:
    image_content = image.read()

In [None]:
# Load Binary Data into Document AI RawDocument Object
raw_document = documentai.RawDocument(content=image_content, mime_type=MIME_TYPE)

In [None]:
# Configure the process request
request = documentai.ProcessRequest(name=RESOURCE_NAME, raw_document=raw_document)

In [None]:
# Use the Document AI client to process the sample form
result = docai_client.process_document(request=request)

In [None]:
document_object = result.document
print("Document processing complete.")
print(f"Text: {document_object.text}")

## Convert pdf pages to images

In [None]:
# Está función lee un archivo en pdf y guarda cada pagina del pdf como una imagen png en el folder de salida
def process_pdf(pdf_path, output_folder):
    filename = os.path.splitext(os.path.basename(pdf_path))[0]
    folder_path = os.path.join(output_folder, filename)
    os.makedirs(folder_path, exist_ok=True)
    
    with fitz.open(pdf_path) as doc:
        for page_number, page in enumerate(doc, start=1):
            image_path = os.path.join(folder_path, f"page_{page_number}.png")
            page.get_pixmap().save(image_path)
            print(f"Guardada la página {page_number} del archivo {pdf_path} como {image_path}")

In [None]:
def save_images(pdf_folder, output_folder):
    for pdf_file in os.listdir(pdf_folder):
        if pdf_file.endswith('.pdf'):
            process_pdf(os.path.join(pdf_folder, pdf_file), output_folder)

In [None]:
pdf_folder = '/home/jupyter/data/1_test_forages'
output_folder = '/home/jupyter/data/2_images_test_forages'

In [None]:
save_images(pdf_folder, output_folder)

In [None]:
# Esta función lee un archivo PDF y guarda cada página como imagen PNG en el folder de salida con alta resolución
def process_pdf(pdf_path, output_folder, dpi=300):
    filename = os.path.splitext(os.path.basename(pdf_path))[0]
    folder_path = os.path.join(output_folder, filename)
    os.makedirs(folder_path, exist_ok=True)
    
    with fitz.open(pdf_path) as doc:
        for page_number, page in enumerate(doc, start=1):
            image_path = os.path.join(folder_path, f"_{page_number}.png")
            # Se ajusta la resolución a través del zoom (dpi / 72)
            zoom = dpi / 72
            matrix = fitz.Matrix(zoom, zoom)  # Matriz de escalado para aumentar la resolución
            pix = page.get_pixmap(matrix=matrix, alpha=True)
            pix.save(image_path)
            print(f"Guardada la página {page_number} del archivo {pdf_path} como {image_path} en {dpi} DPI")

# Esta función procesa todos los archivos PDF en un folder
def save_images(pdf_folder, output_folder, dpi=300):
    for pdf_file in os.listdir(pdf_folder):
        if pdf_file.endswith('.pdf'):
            process_pdf(os.path.join(pdf_folder, pdf_file), output_folder, dpi)

In [None]:
save_images(pdf_folder, output_folder, dpi=300)