# Preparación de Datos con PySpark

Este notebook realiza la preparación de datos usando PySpark para procesar documentos PDF.

In [None]:
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import udf, col
import numpy as np
import subprocess

# Instalar pdf2image si no está disponible
try:
    import pdf2image
except ImportError:
    subprocess.check_call([sys.executable, "-m", "pip", "install", "pdf2image"])
finally:
    from pdf2image import convert_from_path

import cv2

# Configurar Spark
print("Configurando Spark...")
spark = SparkSession.builder \
    .appName("PDFPreprocessing") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()
print("Spark configurado correctamente.")

# Configurar directorios
RAW_DIR = 'data/raw'
PROCESSED_DIR = 'data/processed'

# Validar y crear directorios si no existen
if not os.path.exists(RAW_DIR):
    print(f"Directorio {RAW_DIR} no existe. Creándolo...")
    os.makedirs(RAW_DIR, exist_ok=True)
else:
    print(f"Directorio {RAW_DIR} ya existe.")

if not os.path.exists(PROCESSED_DIR):
    print(f"Directorio {PROCESSED_DIR} no existe. Creándolo...")
    os.makedirs(PROCESSED_DIR, exist_ok=True)
else:
    print(f"Directorio {PROCESSED_DIR} ya existe.")

print(f"Directorios configurados: RAW_DIR={RAW_DIR}, PROCESSED_DIR={PROCESSED_DIR}")

def preprocess_image(image, target_size=(224, 224)):
    """Preprocesa una imagen para el modelo"""
    try:
        print("Iniciando preprocesamiento de imagen...")
        
        # Convertir a escala de grises
        if len(image.shape) == 3:
            print("Convirtiendo imagen a escala de grises...")
            gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
        else:
            gray = image
            print("Imagen ya está en escala de grises.")
            
        # Redimensionar
        print(f"Redimensionando imagen a {target_size}...")
        resized = cv2.resize(gray, target_size)
        
        # Normalizar
        print("Normalizando imagen...")
        normalized = resized.astype(np.float32) / 255.0
        
        print("Preprocesamiento de imagen completado.")
        return normalized
    except Exception as e:
        print(f"Error preprocesando imagen: {str(e)}")
        return None

def process_pdf(pdf_path):
    """Procesa un PDF y retorna información de sus páginas"""
    try:
        print(f"Procesando {pdf_path}...")
        # Convertir PDF a imágenes
        pages = convert_from_path(pdf_path)
        pdf_name = os.path.basename(pdf_path).replace('.pdf', '')
        
        # Crear directorio para este PDF
        pdf_dir = os.path.join(PROCESSED_DIR, pdf_name)
        os.makedirs(pdf_dir, exist_ok=True)
        
        # Procesar cada página
        page_info = []
        for i, page in enumerate(pages):
            # Guardar imagen
            page_path = os.path.join(pdf_dir, f'page_{i}.png')
            page.save(page_path)
            
            # Convertir a array y preprocesar
            img_array = np.array(page)
            processed = preprocess_image(img_array)
            
            page_info.append({
                'pdf_name': pdf_name,
                'page_number': i,
                'path': page_path,
                'features': processed.flatten().tolist() if processed is not None else None
            })
            
        print(f"Procesamiento de {pdf_name} completado.")
        return page_info
    except Exception as e:
        print(f"Error procesando {pdf_path}: {str(e)}")
        return []

# Función UDF para Spark
@udf(returnType=ArrayType(StructType([
    StructField("pdf_name", StringType(), False),
    StructField("page_number", IntegerType(), False),
    StructField("path", StringType(), False),
    StructField("features", ArrayType(FloatType()), True)
])))
def process_pdf_udf(pdf_path):
    return process_pdf(pdf_path)

# Crear DataFrame de archivos PDF
print("Obteniendo lista de archivos PDF...")
pdf_files = [os.path.join(RAW_DIR, f) for f in os.listdir(RAW_DIR) if f.endswith('.pdf')]
print(f"Total de archivos PDF encontrados: {len(pdf_files)}")

pdf_df = spark.createDataFrame(pdf_files, StringType()).toDF("pdf_path")
print(f"Total de archivos PDF encontrados en DataFrame: {pdf_df.count()}")

# Procesar PDFs y crear DataFrame
print("Procesando PDFs con Spark...")
processed_df = pdf_df.withColumn("pages", process_pdf_udf(col("pdf_path")))
processed_df = processed_df.selectExpr("inline(pages)")
print("Procesamiento de PDFs completado.")

# Guardar DataFrame procesado
output_path = os.path.join(PROCESSED_DIR, "processed_pages.parquet")
print(f"Guardando DataFrame en {output_path}...")
processed_df.write.mode("overwrite").parquet(output_path)
print("DataFrame guardado correctamente.")

# Mostrar estadísticas del procesamiento
print(f"Total de PDFs procesados: {pdf_df.count()}")
print(f"Total de páginas procesadas: {processed_df.count()}")
print("\nDistribución de páginas por PDF:")
processed_df.groupBy("pdf_name").count().show()

# Verificar valores nulos o problemas
print("\nVerificación de calidad:")
processed_df.summary().show()

Configurando Spark...
Spark configurado correctamente.
Directorio data/raw ya existe.
Directorio data/processed ya existe.
Directorios configurados: RAW_DIR=data/raw, PROCESSED_DIR=data/processed
Obteniendo lista de archivos PDF...
Total de archivos PDF encontrados: 1
Total de archivos PDF encontrados en DataFrame: 1
Procesando PDFs con Spark...
Procesamiento de PDFs completado.
Guardando DataFrame en data/processed\processed_pages.parquet...


PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "C:\Program Files\spark-3.5.4-bin-hadoop3\python\lib\pyspark.zip\pyspark\worker.py", line 1231, in main
  File "C:\Program Files\spark-3.5.4-bin-hadoop3\python\lib\pyspark.zip\pyspark\worker.py", line 1067, in read_udfs
  File "C:\Program Files\spark-3.5.4-bin-hadoop3\python\lib\pyspark.zip\pyspark\worker.py", line 529, in read_single_udf
  File "C:\Program Files\spark-3.5.4-bin-hadoop3\python\lib\pyspark.zip\pyspark\worker.py", line 90, in read_command
  File "C:\Program Files\spark-3.5.4-bin-hadoop3\python\lib\pyspark.zip\pyspark\serializers.py", line 174, in _read_with_length
    return self.loads(obj)
  File "C:\Program Files\spark-3.5.4-bin-hadoop3\python\lib\pyspark.zip\pyspark\serializers.py", line 472, in loads
    return cloudpickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'pdf2image'


In [None]:
import os
from pdf2image import convert_from_path

RAW_DIR = 'data/raw'
PROCESSED_DIR = 'data/processed'

# Validar y crear directorios si no existen
if not os.path.exists(RAW_DIR):
    print(f"Directorio {RAW_DIR} no existe. Creándolo...")
    os.makedirs(RAW_DIR, exist_ok=True)
else:
    print(f"Directorio {RAW_DIR} ya existe.")

if not os.path.exists(PROCESSED_DIR):
    print(f"Directorio {PROCESSED_DIR} no existe. Creándolo...")
    os.makedirs(PROCESSED_DIR, exist_ok=True)
else:
    print(f"Directorio {PROCESSED_DIR} ya existe.")

print(f"Directorios configurados: RAW_DIR={RAW_DIR}, PROCESSED_DIR={PROCESSED_DIR}")

def process_pdf(pdf_path):
    """Procesa un PDF y guarda sus páginas como imágenes"""
    try:
        print(f"Procesando {pdf_path}...")
        # Convertir PDF a imágenes
        pages = convert_from_path(pdf_path)
        pdf_name = os.path.basename(pdf_path).replace('.pdf', '')
        
        # Crear directorio para este PDF
        pdf_dir = os.path.join(PROCESSED_DIR, pdf_name)
        os.makedirs(pdf_dir, exist_ok=True)
        
        # Guardar cada página como imagen
        for i, page in enumerate(pages):
            page_path = os.path.join(pdf_dir, f'page_{i}.png')
            page.save(page_path)
            print(f"Guardada página {i} de {pdf_name} en {page_path}")
            
        print(f"Procesamiento de {pdf_name} completado.")
    except Exception as e:
        print(f"Error procesando {pdf_path}: {str(e)}")

# Procesar todos los PDFs en el directorio RAW_DIR
pdf_files = [os.path.join(RAW_DIR, f) for f in os.listdir(RAW_DIR) if f.endswith('.pdf')]
for pdf_file in pdf_files:
    process_pdf(pdf_file)