# Proceso ETL: Migración de Datos de PostgreSQL a MongoDB

## Introducción

Este notebook implementa un proceso ETL (Extract, Transform, Load) para migrar datos de un modelo relacional en PostgreSQL a un modelo de documentos en MongoDB.

**Objetivo general:** Extraer datos de tablas relacionales en PostgreSQL, transformarlos en estructuras de documentos anidados adecuadas para MongoDB, y cargarlos eficientemente. Esto es útil para análisis de ventas con datos variables o para aprovechar la escalabilidad de MongoDB.

**Conceptos clave:**
- **ETL:** Extract (obtener datos), Transform (limpiar, enriquecer, reestructurar), Load (cargar en destino).
- **Por qué migrar a NoSQL:** MongoDB ofrece esquemas flexibles y mejor rendimiento para consultas anidadas, ideal para big data.
- **Mejoras:**
  - Manejo robusto de errores.
  - Conversión de tipos (e.g., `datetime.date` a `datetime.datetime` para compatibilidad con BSON).
  - Vistas previas en Jupyter para depuración.
  - Bulk inserts para eficiencia.
  - Validaciones para evitar procesar datos vacíos o inconsistentes.

El proceso se divide en tres etapas.

## Etapa 1: Extracción de Datos desde PostgreSQL

**Objetivo:** Conectar a PostgreSQL y extraer datos de las tablas del modelo ER (`clientes`, `empleados`, `ventas`, `facturas`, etc.). Usamos `psycopg2` para la conexión y `pandas` para cargar los datos en DataFrames.

**Conceptos agregados:**
- **Validación de conexión:** Verificamos la conexión y cerramos recursos.
- **Exploración inicial:** Mostramos las primeras filas de cada tabla para inspección.
- **Manejo de datos vacíos:** Evitamos procesar tablas vacías.

Nota: Asegúrate de tener PostgreSQL corriendo y las credenciales correctas.

In [4]:
# Importar librerías necesarias
import psycopg2
import pandas as pd
from IPython.display import display

def extract_data(host, dbname, user, password):
    """
    Extrae datos de todas las tablas relevantes desde PostgreSQL.
    :param host: Host de la base de datos.
    :param dbname: Nombre de la base de datos.
    :param user: Usuario de la base de datos.
    :param password: Contraseña de la base de datos.
    :return: Diccionario con DataFrames por tabla.
    """
    try:
        # Establecer conexión
        conn = psycopg2.connect(host=host, dbname=dbname, user=user, password=password)
        print("Conexión exitosa a PostgreSQL.")

        # Lista de tablas basada en el modelo ER
        tables = [
            'clientes', 'empleados', 'ventas', 'facturas', 'facturas_detalles',
            'productos', 'proveedores', 'ventas_productos', 'compras_clientes'
        ]

        data = {}
        for table in tables:
            query = f"SELECT * FROM {table}"  # En producción, quitar LIMIT para extraer todo
            df = pd.read_sql_query(query, conn)
            if df.empty:
                print(f"Advertencia: La tabla '{table}' está vacía.")
            else:
                data[table] = df
                print(f"\nDatos de la tabla '{table}' ({len(df)} filas):")
                display(df.head())  # Mostrar primeras filas para depuración

        conn.close()
        if not data:
            raise ValueError("No se extrajeron datos de ninguna tabla.")
        return data

    except Exception as e:
        print(f"Error en la extracción: {e}")
        return None

# Ejecutar extracción (reemplaza con tus credenciales)
data = extract_data(host='localhost', dbname='db_supermercado', user='postgres', password='123456')

Error en la extracción: connection to server at "localhost" (127.0.0.1), port 5432 failed: FATAL:  password authentication failed for user "postgres"
connection to server at "localhost" (127.0.0.1), port 5432 failed: FATAL:  password authentication failed for user "postgres"



## Etapa 2: Transformación de Datos

**Objetivo:** Convertir los datos relacionales en documentos JSON anidados para MongoDB. Esto incluye uniones, limpieza, anidamiento de subdocumentos (e.g., detalles de facturas y productos dentro de ventas) y agregaciones.

**Conceptos agregados:**
- **Validación de columnas:** Verificamos que las columnas necesarias existan antes de cada operación.
- **Conversión de tipos:** Convertimos `datetime.date` a `datetime.datetime` con timezone para compatibilidad con MongoDB.
- **Anidamiento eficiente:** Usamos `groupby` para crear arrays de subdocumentos, optimizando consultas en MongoDB.
- **Agregación:** Calculamos el campo `total_productos` para enriquecer los documentos.

Nota: Esta etapa denormaliza los datos para aprovechar las ventajas de MongoDB.

In [None]:
from datetime import datetime
import pandas as pd
from IPython.display import display

def convert_dates(obj):
    """Convierte objetos datetime.date a datetime.datetime con timezone en un diccionario o lista."""
    if isinstance(obj, dict):
        return {k: convert_dates(v) if isinstance(v, (dict, list)) else pd.to_datetime(v).tz_localize('UTC') if isinstance(v, datetime.date) else v for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_dates(item) for item in obj]
    else:
        return obj

def transforming_data(data):
    """
    Transforma datos tabulares de PostgreSQL a formato de documentos para MongoDB.
    :param data: Diccionario con DataFrames de las tablas.
    :return: Lista de diccionarios en formato JSON para MongoDB.
    """
    if not data or 'ventas' not in data or data['ventas'].empty:
        raise ValueError("No hay datos válidos en 'ventas' para transformar.")

    # 1. Limpieza inicial: Eliminar filas con IDs nulos
    for table in data:
        data[table] = data[table].dropna(subset=[col for col in data[table].columns if 'id' in col.lower()])

    print("Limpieza completada. Vista previa de 'ventas':")
    display(data['ventas'].head())

    # 2. Unir tablas relacionadas
    df_ventas = data['ventas'].copy().rename(columns={'id': 'venta_id'})

    # Verificar columnas antes de cada merge
    if 'id_empleado' not in df_ventas.columns:
        raise KeyError(f"No se encontró la columna 'id_empleado' en df_ventas. Columnas disponibles: {df_ventas.columns.tolist()}")
    df_ventas = df_ventas.merge(data['empleados'], left_on='id_empleado', right_on='id', how='left', suffixes=('', '_empleado'))
    df_ventas = df_ventas.drop(columns=['id_empleado', 'id_empleado'], errors='ignore')
    print("Después de unir con empleados. Columnas:", df_ventas.columns.tolist())

    # Verificar id_factura antes del merge con facturas
    if 'id_factura' not in df_ventas.columns:
        raise KeyError(f"No se encontró la columna 'id_factura' en df_ventas. Columnas disponibles: {df_ventas.columns.tolist()}")
    df_ventas = df_ventas.merge(data['facturas'].rename(columns={'id': 'id_factura'}), left_on='id_factura', right_on='id_factura', how='left')
    print("Después de unir con facturas. Columnas:", df_ventas.columns.tolist())

    # Anidar detalles de facturas
    if 'id_factura' not in data['facturas_detalles'].columns:
        raise KeyError(f"No se encontró la columna 'id_factura' en data['facturas_detalles']. Columnas disponibles: {data['facturas_detalles'].columns.tolist()}")
    df_detalles = data['facturas_detalles'].groupby('id_factura').apply(lambda x: x.to_dict('records')).reset_index(name='detalles_factura')
    df_ventas = df_ventas.merge(df_detalles, left_on='id_factura', right_on='id_factura', how='left')
    df_ventas['detalles_factura'] = df_ventas['detalles_factura'].apply(lambda x: x if isinstance(x, list) else [])
    print("Detalles de facturas anidados. Vista previa:")
    display(df_ventas[['venta_id', 'detalles_factura']].head())

    # Unir clientes
    if 'id_cliente' not in data['compras_clientes'].columns or 'id_venta' not in data['compras_clientes'].columns:
        raise KeyError(f"Faltan columnas en data['compras_clientes']. Columnas disponibles: {data['compras_clientes'].columns.tolist()}")
    df_compras = data['compras_clientes'].merge(data['clientes'], left_on='id_cliente', right_on='id', how='left', suffixes=('', '_cliente'))
    df_ventas = df_ventas.merge(df_compras, left_on='venta_id', right_on='id_venta', how='left').drop(columns=['id_venta', 'id_cliente'], errors='ignore')
    print("Después de unir con compras_clientes. Columnas:", df_ventas.columns.tolist())

    # Unir productos y proveedores
    if 'id_proveedor' not in data['productos'].columns:
        raise KeyError(f"No se encontró la columna 'id_proveedor' en data['productos']. Columnas disponibles: {data['productos'].columns.tolist()}")
    df_productos = data['productos'].merge(data['proveedores'], left_on='id_proveedor', right_on='id', how='left', suffixes=('', '_proveedor'))
    if 'id_producto' not in data['ventas_productos'].columns:
        raise KeyError(f"No se encontró la columna 'id_producto' en data['ventas_productos']. Columnas disponibles: {data['ventas_productos'].columns.tolist()}")
    df_ventas_prod = data['ventas_productos'].merge(df_productos, left_on='id_producto', right_on='id', how='left')
    df_ventas_prod_grouped = df_ventas_prod.groupby('id_venta').apply(lambda x: x.to_dict('records')).reset_index(name='products')
    df_ventas = df_ventas.merge(df_ventas_prod_grouped, left_on='venta_id', right_on='id_venta', how='left')
    df_ventas['products'] = df_ventas['products'].apply(lambda x: x if isinstance(x, list) else [])
    print("Productos y proveedores anidados. Vista previa:")
    display(df_ventas[['venta_id', 'products']].head())

    # 3. Filtrado: Solo ventas con productos
    df_ventas = df_ventas[df_ventas['products'].apply(len) > 0]

    # 4. Convertir fechas y seleccionar columnas
    if 'fecha' in df_ventas.columns:
        df_ventas['fecha'] = pd.to_datetime(df_ventas['fecha']).dt.tz_localize('UTC')
    columns_to_keep = [
        'venta_id', 'numero', 'codigo', 'fecha', 'hora', 'importe_total',
        'detalles_factura', 'products', 'nombre', 'apellido', 'cargo',
        'nombre_cliente', 'apellido_cliente', 'email'
    ]
    columns_to_keep = [col for col in columns_to_keep if col in df_ventas.columns]  # Filtrar columnas existentes
    df_ventas = df_ventas[columns_to_keep]

    # 5. Convertir a documentos y agregar agregaciones
    documents = convert_dates(df_ventas.to_dict('records'))
    for doc in documents:
        doc['total_productos'] = sum(prod.get('cantidad', 0) for prod in doc.get('products', []))

    print(f"Transformación completada. Total de documentos: {len(documents)}")
    display(pd.DataFrame(documents).head())
    return documents

# Ejecutar transformación
try:
    transformed_data = transforming_data(data)
except Exception as e:
    print(f"Error en la transformación: {e}")
    transformed_data = None

Limpieza completada. Vista previa de 'ventas':


Unnamed: 0,id,id_empleado,id_factura
0,1,1,1
1,2,1,2
2,3,2,3


Después de unir con empleados. Columnas: ['venta_id', 'id_factura', 'id', 'nombre', 'apellido', 'edad', 'fecha_nac', 'tipo_doc', 'nro_doc', 'cuil', 'direccion', 'nro_tel_princ', 'nro_tel_sec', 'email', 'cargo', 'antiguedad', 'fecha_ingreso', 'salario_anual']
Después de unir con facturas. Columnas: ['venta_id', 'id_factura', 'id', 'nombre', 'apellido', 'edad', 'fecha_nac', 'tipo_doc', 'nro_doc', 'cuil', 'direccion', 'nro_tel_princ', 'nro_tel_sec', 'email', 'cargo', 'antiguedad', 'fecha_ingreso', 'salario_anual', 'numero', 'codigo', 'fecha', 'hora', 'importe_total']
Detalles de facturas anidados. Vista previa:


Unnamed: 0,venta_id,detalles_factura
0,1,"[{'id': 1, 'id_factura': 1, 'tipo': 'B', 'desc..."
1,2,"[{'id': 2, 'id_factura': 2, 'tipo': 'B', 'desc..."
2,3,"[{'id': 3, 'id_factura': 3, 'tipo': 'B', 'desc..."


Después de unir con compras_clientes. Columnas: ['venta_id', 'id_factura', 'id_x', 'nombre_x', 'apellido_x', 'edad', 'fecha_nac', 'tipo_doc_x', 'nro_doc_x', 'cuil', 'direccion', 'nro_tel_princ_x', 'nro_tel_sec_x', 'email_x', 'cargo', 'antiguedad', 'fecha_ingreso', 'salario_anual', 'numero', 'codigo', 'fecha', 'hora', 'importe_total', 'detalles_factura', 'id_y', 'nombre_y', 'apellido_y', 'tipo_doc_y', 'nro_doc_y', 'nro_tel_princ_y', 'nro_tel_sec_y', 'email_y']
Productos y proveedores anidados. Vista previa:




Unnamed: 0,venta_id,products
0,1,"[{'id_x': 1, 'id_venta': 1, 'id_producto': 1, ..."
1,2,"[{'id_x': 4, 'id_venta': 2, 'id_producto': 2, ..."
2,3,[]


Error en la transformación: isinstance() arg 2 must be a type or tuple of types


## Etapa 3: Carga de Datos en MongoDB

**Objetivo:** Insertar los documentos transformados en una colección de MongoDB usando bulk insert para eficiencia.

**Corrección del error (NameError: 'transformed_data' is not defined):**
- Este error ocurre porque `transformed_data` no se definió debido al fallo en la Etapa 2. Al corregir el error de la Etapa 2 y añadir un bloque try-except, aseguramos que `transformed_data` esté definida o manejemos el caso en que no lo esté.

**Conceptos agregados:**
- **Bulk insert:** Usamos `insert_many` para operaciones atómicas y eficientes.
- **Manejo de errores:** Capturamos `BulkWriteError` para depuración.
- **Gestión de conexión:** Usamos una clase para conectar y cerrar conexiones.
- **Verificación:** Consultamos datos antes y después de la carga para validar.

Nota: En producción, considera crear índices (e.g., `db.ventas.createIndex({venta_id: 1})`) para optimizar consultas.

In [None]:
from pymongo import MongoClient
from pymongo.errors import BulkWriteError, InvalidOperation
from IPython.display import display
import pandas as pd

class MongoDBConnection:
    def __init__(self, host='localhost', port=27017):
        self.host = host
        self.port = port

    def connecting(self):
        return MongoClient(f'mongodb://{self.host}:{self.port}/')

# Instancia de conexión
instance_mongodb = MongoDBConnection()
client = instance_mongodb.connecting()
db = client['dio_analytics']
collection = db['ventas']

try:
    # Verificar datos existentes
    print("Datos existentes en MongoDB antes de la carga:")
    existing_data = list(collection.find().limit(5))
    display(pd.DataFrame(existing_data))

    # Cargar datos
    if transformed_data:
        result = collection.insert_many(transformed_data)
        print(f"{len(result.inserted_ids)} documentos insertados.")
    else:
        print("No hay datos transformados para insertar.")

    # Verificar después de la carga
    print("Datos en MongoDB después de la carga:")
    inserted_data = list(collection.find().limit(5))
    display(pd.DataFrame(inserted_data))

except BulkWriteError as bwe:
    print(f"Error en inserción: {bwe.details}")
except InvalidOperation as ioe:
    print(f"Error de operación con MongoDB: {ioe}")
except Exception as e:
    print(f"Error inesperado: {e}")
finally:
    # Cerrar la conexión solo después de todas las operaciones
    try:
        client.close()
        print("Conexión a MongoDB cerrada.")
    except Exception as e:
        print(f"Error al cerrar la conexión: {e}")


Datos existentes en MongoDB antes de la carga:


No hay datos transformados para insertar.
Datos en MongoDB después de la carga:


Conexión a MongoDB cerrada.


In [None]:
# Importar librerías necesarias
import psycopg2
import pandas as pd
import datetime
from IPython.display import display
from pymongo import MongoClient
from pymongo.errors import BulkWriteError, InvalidOperation

# Etapa 1: Extracción de Datos desde PostgreSQL
def extract_data(host, dbname, user, password):
    """
    Extrae datos de todas las tablas relevantes desde PostgreSQL.
    :param host: Host de la base de datos.
    :param dbname: Nombre de la base de datos.
    :param user: Usuario de la base de datos.
    :param password: Contraseña de la base de datos.
    :return: Diccionario con DataFrames por tabla.
    """
    try:
        # Establecer conexión
        conn = psycopg2.connect(host=host, dbname=dbname, user=user, password=password)
        print("Conexión exitosa a PostgreSQL.")

        # Lista de tablas basada en el modelo ER
        tables = [
            'clientes', 'empleados', 'ventas', 'facturas', 'facturas_detalles',
            'productos', 'proveedores', 'ventas_productos', 'compras_clientes'
        ]

        data = {}
        for table in tables:
            query = f"SELECT * FROM {table}"
            df = pd.read_sql_query(query, conn)
            if df.empty:
                print(f"Advertencia: La tabla '{table}' está vacía.")
            else:
                data[table] = df
                print(f"\nDatos de la tabla '{table}' ({len(df)} filas):")
                display(df.head())  # Mostrar primeras filas para depuración
        conn.close()
        if not data:
            raise ValueError("No se extrajeron datos de ninguna tabla.")
        return data

    except Exception as e:
        print(f"Error en la extracción: {e}")
        return None

# Etapa 2: Transformación de Datos (mejorada)
def convert_dates(obj):
    """Convierte objetos datetime.date y datetime.time a formatos compatibles con MongoDB."""
    if isinstance(obj, dict):
        return {k: convert_dates(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_dates(item) for item in obj]
    elif isinstance(obj, datetime.date) and not isinstance(obj, datetime.datetime):
        return datetime.datetime.combine(obj, datetime.time.min).replace(tzinfo=datetime.timezone.utc)
    elif isinstance(obj, datetime.time):
        return obj.isoformat()  # Convertir datetime.time a cadena ISO
    elif isinstance(obj, datetime.datetime):
        return obj.replace(tzinfo=datetime.timezone.utc) if not obj.tzinfo else obj
    return obj

def transforming_data(data):
    """
    Transforma datos tabulares de PostgreSQL a formato de documentos para MongoDB.
    :param data: Diccionario con DataFrames de las tablas.
    :return: Lista de diccionarios en formato JSON para MongoDB.
    """
    if not data or 'ventas' not in data or data['ventas'].empty:
        raise ValueError("No hay datos válidos en 'ventas' para transformar.")

    # 1. Limpieza inicial: Eliminar filas con IDs nulos
    for table in data:
        data[table] = data[table].dropna(subset=[col for col in data[table].columns if 'id' in col.lower()])

    print("Limpieza completada. Vista previa de 'ventas':")
    display(data['ventas'].head())

    # 2. Unir tablas relacionadas
    df_ventas = data['ventas'].copy().rename(columns={'id': 'venta_id'})

    # Unir con empleados
    if 'id_empleado' not in df_ventas.columns:
        raise KeyError(f"No se encontró la columna 'id_empleado' en df_ventas.")
    df_ventas = df_ventas.merge(data['empleados'].rename(columns={'id': 'empleado_id'}),
                               left_on='id_empleado', right_on='empleado_id', how='left')
    df_ventas.drop(columns=['id_empleado', 'empleado_id'], errors='ignore', inplace=True)
    df_ventas.rename(columns={
        'nombre': 'empleado_nombre', 'apellido': 'empleado_apellido', 'email': 'empleado_email',
        'tipo_doc': 'empleado_tipo_doc', 'nro_doc': 'empleado_nro_doc',
        'nro_tel_princ': 'empleado_nro_tel_princ', 'nro_tel_sec': 'empleado_nro_tel_sec'
    }, inplace=True)
    print("Después de unir con empleados. Columnas:", df_ventas.columns.tolist())

    # Unir con facturas
    if 'id_factura' not in df_ventas.columns:
        raise KeyError(f"No se encontró la columna 'id_factura' en df_ventas.")
    df_ventas = df_ventas.merge(data['facturas'].rename(columns={'id': 'factura_id'}),
                               left_on='id_factura', right_on='factura_id', how='left')
    df_ventas.drop(columns=['factura_id'], errors='ignore', inplace=True)
    print("Después de unir con facturas. Columnas:", df_ventas.columns.tolist())

    # Anidar detalles de facturas
    if 'id_factura' not in data['facturas_detalles'].columns:
        raise KeyError(f"No se encontró la columna 'id_factura' en data['facturas_detalles'].")
    df_detalles = data['facturas_detalles'].groupby('id_factura').apply(
        lambda x: x.drop(columns=['id_factura']).to_dict('records')).reset_index(name='detalles_factura')
    df_ventas = df_ventas.merge(df_detalles, left_on='id_factura', right_on='id_factura', how='left')
    df_ventas['detalles_factura'] = df_ventas['detalles_factura'].apply(lambda x: x if isinstance(x, list) else [])
    print("Detalles de facturas anidados. Vista previa:")
    display(df_ventas[['venta_id', 'detalles_factura']].head())

    # Unir clientes
    if 'id_cliente' not in data['compras_clientes'].columns or 'id_venta' not in data['compras_clientes'].columns:
        raise KeyError(f"Faltan columnas en data['compras_clientes'].")
    df_compras = data['compras_clientes'].merge(data['clientes'].rename(columns={'id': 'cliente_id'}),
                                              left_on='id_cliente', right_on='cliente_id', how='left')
    df_compras.drop(columns=['cliente_id', 'id_cliente'], errors='ignore', inplace=True)
    df_compras.rename(columns={
        'nombre': 'cliente_nombre', 'apellido': 'cliente_apellido', 'email': 'cliente_email',
        'tipo_doc': 'cliente_tipo_doc', 'nro_doc': 'cliente_nro_doc',
        'nro_tel_princ': 'cliente_nro_tel_princ', 'nro_tel_sec': 'cliente_nro_tel_sec'
    }, inplace=True)
    df_ventas = df_ventas.merge(df_compras, left_on='venta_id', right_on='id_venta', how='left')
    df_ventas.drop(columns=['id_venta'], errors='ignore', inplace=True)
    print("Después de unir con compras_clientes. Columnas:", df_ventas.columns.tolist())

    # Unir productos y proveedores
    data['proveedores'].rename(columns={'id': 'proveedor_id', 'empresa': 'proveedor_nombre', 'email': 'proveedor_email'}, inplace=True)
    if 'id_proveedor' not in data['productos'].columns:
        raise KeyError(f"No se encontró la columna 'id_proveedor' en data['productos'].")
    df_productos = data['productos'].merge(data['proveedores'], left_on='id_proveedor', right_on='proveedor_id', how='left')
    df_productos.drop(columns=['proveedor_id'], errors='ignore', inplace=True)

    if 'id_producto' not in data['ventas_productos'].columns:
        raise KeyError(f"No se encontró la columna 'id_producto' en data['ventas_productos'].")
    df_ventas_prod = data['ventas_productos'].merge(df_productos, left_on='id_producto', right_on='id', how='left')
    df_ventas_prod.drop(columns=['id_producto', 'id'], errors='ignore', inplace=True)
    df_ventas_prod = df_ventas_prod.loc[:, ~df_ventas_prod.columns.duplicated()]
    df_ventas_prod_grouped = df_ventas_prod.groupby('id_venta').apply(
        lambda x: x.drop(columns=['id_venta']).to_dict('records')).reset_index(name='products')
    df_ventas = df_ventas.merge(df_ventas_prod_grouped, left_on='venta_id', right_on='id_venta', how='left')
    df_ventas['products'] = df_ventas['products'].apply(lambda x: x if isinstance(x, list) else [])
    df_ventas.drop(columns=['id_venta'], errors='ignore', inplace=True)
    print("Productos y proveedores anidados. Vista previa:")
    display(df_ventas[['venta_id', 'products']].head())

    # 3. Filtrado: Solo ventas con productos
    df_ventas = df_ventas[df_ventas['products'].apply(len) > 0]

    # 4. Convertir fechas y seleccionar columnas
    if 'fecha' in df_ventas.columns:
        df_ventas['fecha'] = pd.to_datetime(df_ventas['fecha'])

    columns_to_keep = [
        'venta_id', 'numero', 'codigo', 'fecha', 'hora', 'importe_total',
        'detalles_factura', 'products', 'empleado_nombre', 'empleado_apellido', 'cargo',
        'cliente_nombre', 'cliente_apellido', 'cliente_email'
    ]
    columns_to_keep = [col for col in columns_to_keep if col in df_ventas.columns]
    df_ventas = df_ventas[columns_to_keep]

    # 5. Vista previa antes de la conversión de fechas
    print("Vista previa de datos antes de convertir fechas:")
    display(df_ventas.head())

    # 6. Convertir a documentos y agregar agregaciones
    documents = df_ventas.to_dict('records')
    documents = convert_dates(documents)
    for doc in documents:
        doc['total_productos'] = sum(prod.get('cantidad', 0) for prod in doc.get('products', []))

    print(f"Transformación completada. Total de documentos: {len(documents)}")
    display(pd.DataFrame(documents).head())
    return documents

# Etapa 3: Carga de Datos en MongoDB (mejorada con revisión adicional)
class MongoDBConnection:
    def __init__(self, host='localhost', port=27017):
        self.host = host
        self.port = port

    def connecting(self):
        return MongoClient(f'mongodb://{self.host}:{self.port}/')

# Ejecutar el proceso ETL
try:
    data = extract_data(host='localhost', dbname='db_supermercado', user='postgres', password='970366')  # Reemplaza con tus credenciales
    if data is None:
        raise ValueError("No se pudieron extraer datos de PostgreSQL.")

    transformed_data = transforming_data(data)
    if transformed_data is None:
        raise ValueError("No se pudieron transformar los datos.")

    # Instancia de conexión
    instance_mongodb = MongoDBConnection()
    client = instance_mongodb.connecting()
    db = client['dio_analytics']
    collection = db['ventas']

    # Verificar datos existentes
    print("Datos existentes en MongoDB antes de la carga:")
    existing_data = list(collection.find({}, {'_id': 0}).limit(5))  # Excluir _id para mejor visualización
    display(pd.DataFrame(existing_data))

    # Cargar datos
    if transformed_data:
        result = collection.insert_many(transformed_data)
        print(f"{len(result.inserted_ids)} documentos insertados.")
    else:
        print("No hay datos transformados para insertar.")

    # Verificar después de la carga
    print("Datos en MongoDB después de la carga (todos los documentos):")
    inserted_data = list(collection.find({}, {'_id': 0}))  # Mostrar todos sin _id
    display(pd.DataFrame(inserted_data))

    # Consulta de ejemplo para revisión
    if collection.count_documents({}) > 0:
        print("Ejemplo de consulta: Total de documentos en la colección:", collection.count_documents({}))
        print("Ejemplo de consulta: Primera venta:")
        first_venta = collection.find_one({}, {'_id': 0})
        display(pd.DataFrame([first_venta]))

        # Nueva consulta: Total de importe por cliente
        pipeline = [
            {"$group": {
                "_id": "$cliente_email",
                "total_importe": {"$sum": "$importe_total"}
            }},
            {"$sort": {"total_importe": -1}},
            {"$limit": 5}
        ]
        print("Ejemplo de consulta: Top 5 clientes por total de importe:")
        top_clientes = list(collection.aggregate(pipeline))
        display(pd.DataFrame(top_clientes))
    else:
        print("No hay documentos para consultar.")

except Exception as e:
    print(f"Error inesperado: {e}")
finally:
    # Cerrar la conexión
    try:
        client.close()
        print("Conexión a MongoDB cerrada.")
    except Exception as e:
        print(f"Error al cerrar la conexión: {e}")

Conexión exitosa a PostgreSQL.

Datos de la tabla 'clientes' (3 filas):


Unnamed: 0,id,nombre,apellido,tipo_doc,nro_doc,nro_tel_princ,nro_tel_sec,email
0,1,Martin,Gutierrez,DNI,4598676890,+549116574839,-,martin.gutierrez@gmail.com
1,2,Sofia,Aguilar,DNI,3494758583,+549116475834,-,sofi-aguilar-cordoba@hotmail.com
2,3,Ramiro,Martinez,DNI,37849567333,-,-,ramiroMartinez3564@gmail.com



Datos de la tabla 'empleados' (8 filas):


Unnamed: 0,id,nombre,apellido,edad,fecha_nac,tipo_doc,nro_doc,cuil,direccion,nro_tel_princ,nro_tel_sec,email,cargo,antiguedad,fecha_ingreso,salario_anual
0,1,Macarena,Gutierrez,32,1989-04-06,DNI,334565243,12-334565243-7,Av. Gaona 352,1164575222,1164575222,maca.gutieerez756@hotmail.com,Cajera,2 a�os,2019-03-01,45000.0
1,2,Damian,gutierrez,39,1978-09-14,DNI,33869556,20-33869556-3,Av Alberdi 05,5491176844456,+5491157684445,damian_gut.756@gmail.com,Cajero,3 a�os y 6 meses,2018-03-04,65000.0
2,3,Marcos,Castro,45,1971-05-01,DNI,48967156,33489671564,Figueroa Alcorta 22,1178654356,+5491178654356,marcosCastro2002_lop@hotmail.com,Repositor,4 a�s,2017-09-06,70000.0
3,4,Marcelo,Perez,28,1988-03-17,DNI,39345679,12-39345679-9,Carabobo 06,5491138765433,-,MarceloPerez@gmail.com,Repositor,9 meses,2019-04-14,56000.0
4,5,Marcelo,Castro,28,1989-04-06,DNI,39886386,14-39886386-5,Los Patos 123,549118567453,+549118567453,marcelocastro.746_jj@gmail.com,Gerente,2 a�os y 5 meses,2019-02-11,120000.0



Datos de la tabla 'ventas' (3 filas):


Unnamed: 0,id,id_empleado,id_factura
0,1,1,1
1,2,1,2
2,3,2,3



Datos de la tabla 'facturas' (3 filas):


Unnamed: 0,id,numero,codigo,fecha,hora,importe_total
0,1,56,67352,2025-05-19,22:26:43.653480,3000.0
1,2,57,67673,2025-05-19,22:26:43.653480,1200.0
2,3,58,67645,2025-05-19,22:26:43.653480,3400.0



Datos de la tabla 'facturas_detalles' (3 filas):


Unnamed: 0,id,id_factura,tipo,descr_factura,costo_asoc,iva,medio_de_pago,descr_pago
0,1,1,B,Consumidor Final,86.0,8.0,EFECTIVO,Un Solo Pago
1,2,2,B,Consumidor Final,86.0,8.0,TARJETA CREDITO,Un solo Pago
2,3,3,B,Consumidor Final,86.0,8.0,TARJETA DEBITO,Un solo Pago



Datos de la tabla 'productos' (20 filas):


Unnamed: 0,id,id_proveedor,codigo,imagen,nombre,marca,tipo,grupo,peso,precio_unidad,stock
0,1,1,AET78U9,https://http2.mlstatic.com/D_NQ_NP_792586-MLA4...,Agua de Mesa sin Gas Nestl� Bid�n 6.3L,Nestl�,Bebidas,Agua,6.3,195.6,500
1,2,4,UI7R8O9,https://http2.mlstatic.com/D_NQ_NP_916232-MLA4...,Vino blanco Cuesta Del Madero 750 cc.,Cuesta del Madero,Bebidas,Vinos,0.75,187.75,200
2,3,5,YUT2563,https://ardiaprod.vteximg.com.br/arquivos/ids/...,Gaseosa cola Cunnington 2.25L,Cunnington,Bebidas,Gaseosas,2.25,90.0,300
3,4,6,YUT2564,https://http2.mlstatic.com/D_NQ_NP_864108-MLA4...,Gaseosa Coca Cola sabor original 1.25L,Coca cola,Bebidas,Gaseosas,1.25,108.7,300
4,5,8,COPR8O6,https://d3ugyf2ht6aenh.cloudfront.net/stores/8...,Tapa de Asado de Novillo x kg,Gen�rico,Carnes y Pescados,Carne Vacuna,1.0,649.0,100



Datos de la tabla 'proveedores' (8 filas):


Unnamed: 0,id,empresa,tipo_producto,direccion,nro_tel_princ,nro_tel_sec,email
0,1,Nestl�,Bebidas- L�Cteos - Productos Frescos,Vicente L�pez 223,0800-999-8100,0800-999-8102,nestle.consultas@gmail.com
1,2,Huella Natural,Frutas - Verduras,Av. San Viroler 222,237635333,-,huellas.consultas@gmail.com
2,3,Sancor,Lacteos - Productos Frecos,Av. Alvarez Jonte 3544,011 4072-5887,011 4072-5885,sancor.consultas@gmail.com
3,4,Cuesta Del Madero,Bebidas,Las Caremorias 222,011 4567-3744,-,cuestaDelMadero.consultas@gmail.com
4,6,Coca-Cola,Bebidas,Av. Entre R�os 221,4567-0989,4635-2843,coca.cola.company@gmail.com



Datos de la tabla 'ventas_productos' (6 filas):


Unnamed: 0,id,id_venta,id_producto,cantidad
0,1,1,1,4
1,2,1,2,5
2,3,1,3,5
3,4,2,2,2
4,5,2,3,4



Datos de la tabla 'compras_clientes' (2 filas):


Unnamed: 0,id,id_venta,id_cliente
0,1,1,1
1,2,2,2


Limpieza completada. Vista previa de 'ventas':


Unnamed: 0,id,id_empleado,id_factura
0,1,1,1
1,2,1,2
2,3,2,3


Después de unir con empleados. Columnas: ['venta_id', 'id_factura', 'empleado_nombre', 'empleado_apellido', 'edad', 'fecha_nac', 'empleado_tipo_doc', 'empleado_nro_doc', 'cuil', 'direccion', 'empleado_nro_tel_princ', 'empleado_nro_tel_sec', 'empleado_email', 'cargo', 'antiguedad', 'fecha_ingreso', 'salario_anual']
Después de unir con facturas. Columnas: ['venta_id', 'id_factura', 'empleado_nombre', 'empleado_apellido', 'edad', 'fecha_nac', 'empleado_tipo_doc', 'empleado_nro_doc', 'cuil', 'direccion', 'empleado_nro_tel_princ', 'empleado_nro_tel_sec', 'empleado_email', 'cargo', 'antiguedad', 'fecha_ingreso', 'salario_anual', 'numero', 'codigo', 'fecha', 'hora', 'importe_total']
Detalles de facturas anidados. Vista previa:


Unnamed: 0,venta_id,detalles_factura
0,1,"[{'id': 1, 'tipo': 'B', 'descr_factura': 'Cons..."
1,2,"[{'id': 2, 'tipo': 'B', 'descr_factura': 'Cons..."
2,3,"[{'id': 3, 'tipo': 'B', 'descr_factura': 'Cons..."


Después de unir con compras_clientes. Columnas: ['venta_id', 'id_factura', 'empleado_nombre', 'empleado_apellido', 'edad', 'fecha_nac', 'empleado_tipo_doc', 'empleado_nro_doc', 'cuil', 'direccion', 'empleado_nro_tel_princ', 'empleado_nro_tel_sec', 'empleado_email', 'cargo', 'antiguedad', 'fecha_ingreso', 'salario_anual', 'numero', 'codigo', 'fecha', 'hora', 'importe_total', 'detalles_factura', 'id', 'cliente_nombre', 'cliente_apellido', 'cliente_tipo_doc', 'cliente_nro_doc', 'cliente_nro_tel_princ', 'cliente_nro_tel_sec', 'cliente_email']
Productos y proveedores anidados. Vista previa:


Unnamed: 0,venta_id,products
0,1,"[{'id_x': 1, 'cantidad': 4, 'id_y': 1, 'id_pro..."
1,2,"[{'id_x': 4, 'cantidad': 2, 'id_y': 2, 'id_pro..."
2,3,[]


Vista previa de datos antes de convertir fechas:


Unnamed: 0,venta_id,numero,codigo,fecha,hora,importe_total,detalles_factura,products,empleado_nombre,empleado_apellido,cargo,cliente_nombre,cliente_apellido,cliente_email
0,1,56,67352,2025-05-19,22:26:43.653480,3000.0,"[{'id': 1, 'tipo': 'B', 'descr_factura': 'Cons...","[{'id_x': 1, 'cantidad': 4, 'id_y': 1, 'id_pro...",Macarena,Gutierrez,Cajera,Martin,Gutierrez,martin.gutierrez@gmail.com
1,2,57,67673,2025-05-19,22:26:43.653480,1200.0,"[{'id': 2, 'tipo': 'B', 'descr_factura': 'Cons...","[{'id_x': 4, 'cantidad': 2, 'id_y': 2, 'id_pro...",Macarena,Gutierrez,Cajera,Sofia,Aguilar,sofi-aguilar-cordoba@hotmail.com


Transformación completada. Total de documentos: 2


Unnamed: 0,venta_id,numero,codigo,fecha,hora,importe_total,detalles_factura,products,empleado_nombre,empleado_apellido,cargo,cliente_nombre,cliente_apellido,cliente_email,total_productos
0,1,56,67352,2025-05-19 00:00:00+00:00,22:26:43.653480,3000.0,"[{'id': 1, 'tipo': 'B', 'descr_factura': 'Cons...","[{'id_x': 1, 'cantidad': 4, 'id_y': 1, 'id_pro...",Macarena,Gutierrez,Cajera,Martin,Gutierrez,martin.gutierrez@gmail.com,14
1,2,57,67673,2025-05-19 00:00:00+00:00,22:26:43.653480,1200.0,"[{'id': 2, 'tipo': 'B', 'descr_factura': 'Cons...","[{'id_x': 4, 'cantidad': 2, 'id_y': 2, 'id_pro...",Macarena,Gutierrez,Cajera,Sofia,Aguilar,sofi-aguilar-cordoba@hotmail.com,11


Datos existentes en MongoDB antes de la carga:


2 documentos insertados.
Datos en MongoDB después de la carga (todos los documentos):


Unnamed: 0,venta_id,numero,codigo,fecha,hora,importe_total,detalles_factura,products,empleado_nombre,empleado_apellido,cargo,cliente_nombre,cliente_apellido,cliente_email,total_productos
0,1,56,67352,2025-05-19,22:26:43.653480,3000.0,"[{'id': 1, 'tipo': 'B', 'descr_factura': 'Cons...","[{'id_x': 1, 'cantidad': 4, 'id_y': 1, 'id_pro...",Macarena,Gutierrez,Cajera,Martin,Gutierrez,martin.gutierrez@gmail.com,14
1,2,57,67673,2025-05-19,22:26:43.653480,1200.0,"[{'id': 2, 'tipo': 'B', 'descr_factura': 'Cons...","[{'id_x': 4, 'cantidad': 2, 'id_y': 2, 'id_pro...",Macarena,Gutierrez,Cajera,Sofia,Aguilar,sofi-aguilar-cordoba@hotmail.com,11


Ejemplo de consulta: Total de documentos en la colección: 2
Ejemplo de consulta: Primera venta:


Unnamed: 0,venta_id,numero,codigo,fecha,hora,importe_total,detalles_factura,products,empleado_nombre,empleado_apellido,cargo,cliente_nombre,cliente_apellido,cliente_email,total_productos
0,1,56,67352,2025-05-19,22:26:43.653480,3000.0,"[{'id': 1, 'tipo': 'B', 'descr_factura': 'Cons...","[{'id_x': 1, 'cantidad': 4, 'id_y': 1, 'id_pro...",Macarena,Gutierrez,Cajera,Martin,Gutierrez,martin.gutierrez@gmail.com,14


Ejemplo de consulta: Top 5 clientes por total de importe:


Unnamed: 0,_id,total_importe
0,martin.gutierrez@gmail.com,3000.0
1,sofi-aguilar-cordoba@hotmail.com,1200.0


Conexión a MongoDB cerrada.
