Definimos un enum con todas las categorías posibles de objeto.

In [1]:
from concurrent.futures import ThreadPoolExecutor, as_completed
from pydantic import BaseModel
from enum import Enum

class ClaseObjetoEnum(str, Enum):
    """Enumeración de los valores permitidos para clase_objeto."""
    AGRICULTURA = "Agricultura, Ganadería, Caza, Silvicultura y Pesca"
    MINAS = "Explotación de Minas y Canteras"
    INDUSTRIA = "Industria Manufacturera"
    ELECTRICIDAD = "Suministro de Electricidad, Gas, Vapor y Aire Acondicionado"
    AGUA = "Suministro de Agua, Cloacas, Gestión de Residuos y Saneamiento"
    CONSTRUCCION = "Construcción"
    COMERCIO = "Comercio al por Mayor y al por Menor"
    TRANSPORTE = "Servicio de Transporte y Almacenamiento"
    ALOJAMIENTO = "Servicios de Alojamiento y Servicios de Comida"
    INFO_COM = "Información y Comunicaciones"
    FINANZAS = "Intermediación Financiera y Servicios de Seguros"
    INMOBILIARIO = "Servicios Inmobiliarios"
    PROFESIONALES = "Servicios Profesionales, Científicos y Técnicos"
    ADMIN = "Actividades Administrativas y Servicios de Apoyo"
    ADM_PUBLICA = "Administración Pública, Defensa y Seguridad Social"
    ENSENANZA = "Enseñanza"
    SALUD = "Salud Humana y Servicios Sociales"
    ARTE = "Servicios Artísticos, Culturales, Deportivos y de Esparcimiento"
    ASOCIACIONES = "Servicios de Asociaciones y Servicios Personales"
    HOGARES = "Actividades de Hogares que Emplean Personal Doméstico"
    EXTRATERRITORIAL = "Actividades de Organizaciones y Órganos Extraterritoriales"
    OTROS = "Otros"

Definimos el esquema en Pydantic

In [2]:
class Socio(BaseModel):
    dni: str
    cuit_cuil: str
    profesion: str
    domicilio: str
    estado_civil: str
    nacionalidad: str
    nombre_completo: str
    fecha_nacimiento: str
    cantidad_acciones: int

class Directivo(BaseModel):
    nombre_completo: str
    cargo: str

class Directivo(BaseModel):
    nombre_completo: str
    cargo: str

class Sociedad(BaseModel):
    nombre: str
    aviso_id: str
    sede_social: str
    fecha_inicio: str
    duracion: str
    clase_objeto: ClaseObjetoEnum
    # objeto: str
    
    capital_social: int
    valor_por_accion: int
    cantidad_acciones: int

    socios: list[Socio]
    directorio: list[Directivo]

Nos conectamos a la BD MySQL

In [3]:
from sqlalchemy import Column, Integer, String, Date, Float, ForeignKey, create_engine, text, Text, Numeric, table, column, update
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
import os

DB_USER = os.getenv("MYSQL_USER")
DB_PASSWORD = os.getenv("MYSQL_PASSWORD")
DB_HOST = os.getenv("DB_HOST")
DB_NAME = os.getenv("MYSQL_DATABASE")

DATABASE_URL = f"mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}"

engine = create_engine(DATABASE_URL)
connection = engine.connect()

Construimos el modelo ERD en SQLAlchemy, y creamos las tablas en la BD si no existen.

In [4]:
Base = declarative_base()

class TableSociedad(Base):
    __tablename__ = 'sociedades'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    nombre = Column(String(255), index=True, nullable=False)
    aviso_id = Column(String(100), index=True, nullable= False)
    sede_social = Column(String(500))
    fecha_inicio = Column(Date)
    duracion = Column(String(100))
    
    clase_objeto = Column(String(100))
    # objeto = Column(Text)
    
    capital_social = Column(Float)
    valor_por_accion = Column(Float)
    cantidad_acciones = Column(Integer)
    
    id_direccion = Column(Integer, ForeignKey('direcciones.id'), nullable=True)

    socios = relationship("TableSocio", back_populates="sociedad", cascade="all, delete-orphan")
    directorio = relationship("TableDirectivo", back_populates="sociedad", cascade="all, delete-orphan")
    
    direccion_sede = relationship("TableDirecciones", back_populates="sociedades_sede", foreign_keys=[id_direccion])

class TableSocio(Base):
    __tablename__ = 'socios'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    sociedad_id = Column(Integer, ForeignKey('sociedades.id'), nullable=False)
    
    dni = Column(String(20), index=True)
    cuit_cuil = Column(String(20), index=True)
    profesion = Column(String(100))
    domicilio = Column(String(500))
    estado_civil = Column(String(50))
    nacionalidad = Column(String(50))
    nombre_completo = Column(String(255))
    fecha_nacimiento = Column(Date)
    cantidad_acciones = Column(Integer)

    id_direccion = Column(Integer, ForeignKey('direcciones.id'), nullable=True)
    sociedad = relationship("TableSociedad", back_populates="socios")
    direccion = relationship("TableDirecciones", back_populates="socios")


class TableDirectivo(Base):
    __tablename__ = 'directorio'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    sociedad_id = Column(Integer, ForeignKey('sociedades.id'), nullable=False)
    
    nombre_completo = Column(String(255))
    cargo = Column(String(100))
    sociedad = relationship("TableSociedad", back_populates="directorio")


class TableDirecciones(Base):
    __tablename__ = 'direcciones'

    id = Column(Integer, primary_key=True, autoincrement=True)
    calle = Column(String(255))
    numero = Column(Integer)
    piso_depto = Column(String(100))
    provincia = Column(String(100))
    codigo_postal = Column(String(50))
    lat = Column(Numeric(9, 6))
    lon = Column(Numeric(9, 6))


    sociedades_sede = relationship("TableSociedad", back_populates="direccion_sede")
    socios = relationship("TableSocio", back_populates="direccion")

print("Creating tables if they don't exist...")
Base.metadata.create_all(engine)
print("Tables created successfully.")

Creating tables if they don't exist...
Tables created successfully.


In [5]:
from datetime import datetime
from dateutil.parser import parse

NUM_AVISOS_PER_BATCH = 200

def get_avisos_to_process(session):
    query = text("SELECT * FROM avisos WHERE procesado = false and id_rubro in (1110, 1130) LIMIT :limit_num;")
    records = session.execute(query, {"limit_num": NUM_AVISOS_PER_BATCH}) 
    avisos = records.fetchall()

    if not avisos:
        print("No new records to process.")
        return []
    return avisos

def build_input_text(avisos):
    input_text = ""
    for aviso in avisos:
        input_text += f'aviso_id: {aviso.aviso_id} Sociedad: {aviso.sociedad} Aviso: {aviso.detalle_aviso}'
    return input_text

def parse_date_dmy(date_str: str | None):
    """Parses a dd/mm/yy or dd/mm/yyyy string into a Date object."""
    if not date_str:
        return None
    try:
        return parse(date_str)
    except ValueError:
        try:
            return datetime.strptime(date_str, '%Y-%m-%d').date()
        except ValueError:
            print(f"Warning: Could not parse date '{date_str}'. Returning None.")
            return None

def add_sociedad_to_session(session, sociedad):
    """
    Crea los objetos SQLAlchemy para una sociedad y sus relaciones
    y los AÑADE a la sesión. No hace commit ni actualiza el aviso.
    """
    try:
        # Create the main SQLAlchemy TableSociedad object
        new_sociedad = TableSociedad(
            nombre=sociedad.nombre,
            aviso_id=sociedad.aviso_id,
            sede_social=sociedad.sede_social,
            fecha_inicio=parse_date_dmy(sociedad.fecha_inicio),
            duracion=sociedad.duracion,
            clase_objeto=sociedad.clase_objeto.value,
            # objeto=sociedad.objeto,
            capital_social=sociedad.capital_social,
            valor_por_accion=sociedad.valor_por_accion,
            cantidad_acciones=sociedad.cantidad_acciones
        )
        
        # Create and add related Socios
        for socio in sociedad.socios:
            new_socio = TableSocio(
                dni=socio.dni,
                cuit_cuil=socio.cuit_cuil,
                profesion=socio.profesion,
                domicilio=socio.domicilio,
                estado_civil=socio.estado_civil,
                nacionalidad=socio.nacionalidad,
                nombre_completo=socio.nombre_completo,
                fecha_nacimiento=parse_date_dmy(socio.fecha_nacimiento),
                cantidad_acciones=socio.cantidad_acciones
            )
            new_sociedad.socios.append(new_socio)
        
        # Create and add related Directivos
        for directivo in sociedad.directorio:
            new_directivo = TableDirectivo(
                nombre_completo=directivo.nombre_completo,
                cargo=directivo.cargo
            )
            new_sociedad.directorio.append(new_directivo)
        
        session.add(new_sociedad)
        print(f"Added society for aviso_id {sociedad.aviso_id} to session.")
    
    except Exception as e:
        # Lanza la excepción hacia arriba para que el bucle principal la maneje
        print(f"!!-- ERROR processing/adding record {sociedad.aviso_id}: {e}")
        raise e # Importante: relanzar la excepción

In [10]:
# Add this import
from concurrent.futures import ThreadPoolExecutor, as_completed

# Modified worker that processes a pre-assigned batch
def process_batch(worker_id: int, avisos_batch):
    """Worker function that processes a pre-assigned batch of avisos."""
    if not avisos_batch:
        print(f"Worker {worker_id}: Empty batch. Nothing to do.")
        return
        
    session = SessionLocal()
    try:
        batch_aviso_ids = {aviso.aviso_id for aviso in avisos_batch}
        input_text = build_input_text(avisos_batch)
        
        response = client.models.generate_content(
            model="gemini-2.5-flash-preview-09-2025",
            contents=input_text,
            config={
                "system_instruction": SYSTEM_PROMPT,
                "response_mime_type": "application/json",
                "response_schema": list[Sociedad]
            },
        )

        sociedades: list[Sociedad] = response.parsed
        successful_sociedades_added = 0
        
        for sociedad in sociedades:
            try:
                add_sociedad_to_session(session, sociedad)
                session.commit()
                print(f"Worker {worker_id}: Committed {sociedad.aviso_id}.")
                successful_sociedades_added += 1
            except Exception as e:
                print(f"Worker {worker_id}: Failed to add society {sociedad.aviso_id}: {e}")
                session.rollback()
                
    except Exception as e:
        print(f"Worker {worker_id}: Fatal error: {e}")
    finally:
        if batch_aviso_ids:
            avisos_table = table('avisos', column('aviso_id'), column('procesado'))
            update_stmt = update(avisos_table).where(avisos_table.c.aviso_id.in_(batch_aviso_ids)).values(procesado=1)
            session.execute(update_stmt)
            session.commit()
            print(f'worker {worker_id}: set to processed {batch_aviso_ids}')

        session.close()
        print(f"Worker {worker_id}: Session closed.")

# Main orchestration logic
from google import genai

client = genai.Client()

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
SYSTEM_PROMPT = """
Eres un sistema experto en estructurar movimientos societarios realizados en la República Argentina. 
Extrae toda la información clave y verídica de los siguientes avisos societarios:
"""

MAX_ITERATIONS = 200
n_iters = 0

while True:
    # Fetch all avisos once and split them
    print("Fetching all unprocessed avisos...")
    initial_session = SessionLocal()
    all_avisos = get_avisos_to_process(initial_session)
    initial_session.close()

    if not all_avisos:
        break
    else:
        # Split into chunks for workers
        NUM_WORKERS = 20
        chunk_size = (len(all_avisos) + NUM_WORKERS - 1) // NUM_WORKERS
        avisos_batches = [all_avisos[i:i + chunk_size] for i in range(0, len(all_avisos), chunk_size)]

        print(f"Starting parallel processing with {NUM_WORKERS} workers for {len(all_avisos)} total avisos...")
        
        with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
            futures = [executor.submit(process_batch, i, batch) for i, batch in enumerate(avisos_batches)]
            
            for future in as_completed(futures):
                try:
                    future.result()
                except Exception as e:
                    print(f"Worker failed: {e}")

        n_iters += 1
        print(f'iteration {n_iters}')
        if n_iters >= MAX_ITERATIONS:
            break
    print("All processing completed.")

Fetching all unprocessed avisos...
Starting parallel processing with 20 workers for 200 total avisos...
Added society for aviso_id A732702 to session.
Worker 12: Committed A732702.
Added society for aviso_id A732729 to session.
Worker 12: Committed A732729.
Added society for aviso_id A732705 to session.
Worker 12: Committed A732705.
Added society for aviso_id A732733 to session.
Worker 12: Committed A732733.
Added society for aviso_id A732710 to session.
Worker 12: Failed to add society A732710: (mysql.connector.errors.DataError) 1406 (22001): Data too long for column 'profesion' at row 1
[SQL: INSERT INTO socios (sociedad_id, dni, cuit_cuil, profesion, domicilio, estado_civil, nacionalidad, nombre_completo, fecha_nacimiento, cantidad_acciones, id_direccion) VALUES (%(sociedad_id)s, %(dni)s, %(cuit_cuil)s, %(profesion)s, %(domicilio)s, %(estado_civil)s, %(nacionalidad)s, %(nombre_completo)s, %(fecha_nacimiento)s, %(cantidad_acciones)s, %(id_direccion)s)]
[parameters: {'sociedad_id': 12