In [1]:
# !pip install pandas sqlalchemy mysql mysqlclient psycopg2-binary pyodbc oracledb mariadb mysql-connector-python

In [2]:
# ==========================================================
# 1. CONFIGURACIÓN DE RUTAS Y MÓDULOS
# ==========================================================
import sys
import os
# import importlib 
# import extractors.extractors
# importlib.reload(extractors.extractors)

# Obtener la ruta de la carpeta que contiene el notebook
notebook_dir = os.getcwd() 

# Añadir la carpeta 'etl_modules' a la ruta de búsqueda de Python (sys.path)
# Esto permite importar el contenido de 'config', 'extractors', etc., directamente.
sys.path.append(os.path.join(notebook_dir, 'etl_modules'))

print(f"Ruta de módulos añadida: {os.path.join(notebook_dir, 'etl_modules')}")


Ruta de módulos añadida: C:\Users\edela\Documents\Análisi Innovador\Taller ingesta Repo\Analisis_Innovador\app ingesta\etl_modules


In [3]:
# ----------------------------------------------------------
# 2. IMPORTACIÓN DE CLASES Y FUNCIONES
# ----------------------------------------------------------

# Ahora las importaciones son más limpias y reflejan la estructura de carpetas
from extractors.extractors import (
    MySqlSourceExtractor, OracleExtractor, PostgresExtractor, 
    SqlServerExtractor, MariaDBExtractor
)
from transformers.transformers import standardize_columns, clean_and_cast_data
from loaders.loaders import SQLiteLoader
import pandas as pd

In [4]:
# ==========================================================
# 3. LÓGICA DEL PIPELINE ETL
# ==========================================================
import importlib 
import extractors.extractors
importlib.reload(extractors.extractors)

# Definición de la query genérica para ventas
VENTAS_QUERY = "SELECT * FROM venta_data" 

# Lista de extractores (DIP)
extractor_map = {
    "MySQL_Source": MySqlSourceExtractor(),
    "Oracle_Source": OracleExtractor(),
    "Postgres_Source": PostgresExtractor(),
    "SQLServer_Source": SqlServerExtractor(),
    "MariaDB_Source": MariaDBExtractor()
}

all_consolidated_dfs = []

In [5]:
# ==========================================================
# 4. PROCESAMIENTO ETL ITERATIVO
# ==========================================================
print("--- INICIO DEL PIPELINE DE VENTAS (E X T R A C C I Ó N) ---")

for source_name, extractor in extractor_map.items():
    print(f"\n[PROCESANDO FUENTE: {source_name}]")
    
    # E: Extracción
    df_raw = extractor.fetch_data(VENTAS_QUERY)
    
    if df_raw.empty:
        continue

    # T: Transformación y Estandarización
    df_transformed = standardize_columns(df_raw.copy(), source_name)
    df_cleaned = clean_and_cast_data(df_transformed)
    
    # Asignar la fuente para auditoría
    df_cleaned['source_db'] = source_name
    
    # Almacenar para consolidación
    all_consolidated_dfs.append(df_cleaned)
    print(f"-> DataFrame de {source_name} listo para consolidación.")

--- INICIO DEL PIPELINE DE VENTAS (E X T R A C C I Ó N) ---

[PROCESANDO FUENTE: MySQL_Source]
-> Extrayendo datos desde: mysql
-> Extracción exitosa. Filas: 10000
-> Columnas estandarizadas para MySQL_Source.
-> DataFrame de MySQL_Source listo para consolidación.

[PROCESANDO FUENTE: Oracle_Source]
-> Extrayendo datos desde: oracle
-> Extracción exitosa. Filas: 10000
-> Columnas estandarizadas para Oracle_Source.
-> DataFrame de Oracle_Source listo para consolidación.

[PROCESANDO FUENTE: Postgres_Source]
-> Extrayendo datos desde: postgresql
-> Extracción exitosa. Filas: 10000
-> Columnas estandarizadas para Postgres_Source.
-> DataFrame de Postgres_Source listo para consolidación.

[PROCESANDO FUENTE: SQLServer_Source]
-> Extrayendo datos desde: mssql
ERROR de extracción en mssql: (pyodbc.OperationalError) ('08001', '[08001] [Microsoft][ODBC Driver 17 for SQL Server]TCP Provider: Error no recuperable durante una búsqueda en base de datos.\r\n (11003) (SQLDriverConnect); [08001] [Mic

In [6]:
# ==========================================================
# 5. CONSOLIDACIÓN Y CARGA
# ==========================================================

if not all_consolidated_dfs:
    print("\nFATAL: No se extrajeron datos de ninguna fuente. Terminando.")
else:
    print("\n--- INICIO DE CONSOLIDACIÓN ---")
    df_final = pd.concat(all_consolidated_dfs, ignore_index=True, sort=False)
    
    # T: Transformación final
    df_final.fillna(value={'amount': 0.0, 'customer_id': 'N/A'}, inplace=True)
    
    print(f"Total de registros a cargar: {len(df_final)}")
    print(df_final.head())
    
    # L: Carga (Usando el nuevo SQLite Loader)
    print("\n--- INICIO DE CARGA (L O A D) EN SQLITE ---")
    sqlite_loader = SQLiteLoader() # CAMBIO: Usar el nuevo Loader
    sqlite_loader.load_data(df_final, table_name="ventas_consolidado")
    
    print("\n>>> PIPELINE ETL FINALIZADO EXITOSAMENTE <<<")


--- INICIO DE CONSOLIDACIÓN ---
Total de registros a cargar: 40000
            productos  cantidad  valor_individual     forma_pago  \
0          Rosquillas         4           1235.25  Transferencia   
1        Arroz Blanco         3           2129.40  Transferencia   
2           Salchicha         1           2018.40  Transferencia   
3  Detergente Líquido         4           1944.80  Transferencia   
4     Harina de Trigo         4           1816.50  Transferencia   

         clientes  tiendas     vendedores     source_db  
0      Juan Pérez  Central       Ana Soto  MySQL_Source  
1  Valeria Moreno  Central    Carlos Ruiz  MySQL_Source  
2   Marta Sánchez  Central  Ricardo Gómez  MySQL_Source  
3       Ana Gómez  Central       Ana Soto  MySQL_Source  
4    Miguel Pardo  Central  Ricardo Gómez  MySQL_Source  

--- INICIO DE CARGA (L O A D) EN SQLITE ---
-> Iniciando carga de 40000 registros a SQLite en tabla 'ventas_consolidado'...
-> Carga finalizada exitosamente en SQLite.

>>> P