In [30]:
import pandas as pd
import requests
import logging
from typing import List, Optional
from requests.exceptions import RequestException

In [31]:
# Configurar logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

In [32]:
# Configuración de archivos y endpoints
#ruta_hired_employees = "archivos/hired_employees.csv"
ruta_hired_employees = "archivos/employees2.csv"
ruta_departments = "archivos/departments.csv"
ruta_jobs = "archivos/Jobs.csv"

In [33]:
columns_names_hired_employees = ["id", "name", "datetime", "department_id", "job_id"]
columns_names_departments = ["id", "department"]
columns_names_jobs = ["id", "job"]

In [34]:
API_URL_hired_employees = "http://localhost:8000/hired_employees/batch_insert"
API_URL_deparments = "http://localhost:8000/departments/batch_insert"
API_URL_jobs = "http://localhost:8000/jobs/batch_insert"

In [35]:
def leer_csv_en_chunks(ruta: str, chunk_size: int) -> pd.io.parsers.TextFileReader:
    """Lee un archivo CSV por chunks."""
    return pd.read_csv(ruta, header=None, chunksize=chunk_size)

In [36]:
def transformar_chunk(chunk: pd.DataFrame, columnas: List[str]) -> pd.DataFrame:
    """Asigna columnas, convierte tipos y reemplaza NaN por None."""
    chunk.columns = columnas
    chunk = chunk.astype(object)
    return chunk.where(pd.notnull(chunk), None)

In [37]:
def convertir_a_json(chunk: pd.DataFrame) -> List[dict]:
    """Convierte un DataFrame a lista de diccionarios (JSON)."""
    return chunk.to_dict(orient="records")

In [38]:
def enviar_a_api(api_url: str, data: List[dict]) -> Optional[requests.Response]:
    """Envía datos en formato JSON a la API y devuelve la respuesta."""
    try:
        response = requests.post(api_url, json=data, timeout=10)
        response.raise_for_status()
        return response
    except RequestException as e:
        # Mostrar el contenido de la respuesta si está disponible
        if hasattr(e, 'response') and e.response is not None:
            logger.error(f"Error al enviar datos a la API ({api_url}): {e.response.text}")
        else:
            logger.error(f"Error al enviar datos a la API ({api_url}): {e}")
        return None

In [None]:
def procesar_y_enviar_csv(
    ruta: str,
    columnas: List[str],
    api_url: str,
    chunk_size: int = 2000
) -> None:
    """Proceso completo de lectura, transformación y envío a API."""
    for i, chunk in enumerate(leer_csv_en_chunks(ruta, chunk_size)):
        df = transformar_chunk(chunk, columnas)
        json_data = convertir_a_json(df)
        response = enviar_a_api(api_url, json_data)

        if response:
            logger.info(f"[{api_url}] Chunk {i+1} enviado con éxito ({len(json_data)} registros)")
        else:
            logger.error(f"[{api_url}] Error en el envío del chunk {i+1}")
            break  # Detener en caso de error

In [40]:
def main() -> None:
    """Función principal del proceso ETL."""
    logger.info("Iniciando carga de datos...")

    procesar_y_enviar_csv(ruta_departments, columns_names_departments, API_URL_deparments)
    procesar_y_enviar_csv(ruta_jobs, columns_names_jobs, API_URL_jobs)
    procesar_y_enviar_csv(ruta_hired_employees, columns_names_hired_employees, API_URL_hired_employees)

    logger.info("Carga de datos finalizada.")

In [43]:
if __name__ == "__main__":
    main()

2025-08-10 23:57:47,425 - INFO - Iniciando carga de datos...
2025-08-10 23:57:49,508 - INFO - [http://localhost:8000/departments/batch_insert] Chunk 1 enviado con éxito (12 registros)
2025-08-10 23:57:51,580 - INFO - [http://localhost:8000/jobs/batch_insert] Chunk 1 enviado con éxito (183 registros)
2025-08-10 23:57:53,687 - ERROR - Error al enviar datos a la API (http://localhost:8000/hired_employees/batch_insert): {"detail":"No puedes insertar más de 2000 registros por petición"}
2025-08-10 23:57:53,688 - ERROR - [http://localhost:8000/hired_employees/batch_insert] Error en el envío del chunk 1
2025-08-10 23:57:53,690 - INFO - Carga de datos finalizada.
