# Proceso ETL Completo

Este notebook muestra cu00f3mo implementar un proceso ETL (Extract, Transform, Load) completo utilizando nuestro framework.

## Configuraciu00f3n del entorno

In [1]:
import sys
import os

# Agregar el directorio rau00edz al path para poder importar los mu00f3dulos
sys.path.append(os.path.abspath('../'))

# Importar mu00f3dulos ETL
from src.etl.extract import extract_from_csv, extract_from_api
from src.etl.transform import transform_raw_data
from src.etl.load import load_to_final_table

# Importar utilidades
from src.utils.database import engine, execute_query

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sqlalchemy import text
import json
from datetime import datetime

# Configurar visualizaciones
sns.set(style="whitegrid")
plt.rcParams["figure.figsize"] = (12, 8)

## 1. Extracciu00f3n (Extract)

Primero, vamos a crear algunos datos de ejemplo y guardarlos en un archivo CSV para simular la extracciu00f3n de datos.

In [2]:
# Crear datos de ejemplo
np.random.seed(42)  # Para reproducibilidad

# Crear un DataFrame con datos simulados de ventas
data = {
    'producto_id': np.random.randint(1, 100, 50),
    'cantidad': np.random.randint(1, 20, 50),
    'precio_unitario': np.random.uniform(10.0, 500.0, 50).round(2),
    'categoria': np.random.choice(['Electru00f3nica', 'Ropa', 'Hogar', 'Deportes', 'Juguetes'], 50),
    'fecha_venta': [datetime.now().strftime('%Y-%m-%d') for _ in range(50)],
    'region': np.random.choice(['Norte', 'Sur', 'Este', 'Oeste', 'Centro'], 50)
}

# Calcular el total de la venta
df = pd.DataFrame(data)
df['total_venta'] = df['cantidad'] * df['precio_unitario']

# Mostrar los primeros registros
df.head()

Unnamed: 0,producto_id,cantidad,precio_unitario,categoria,fecha_venta,region,total_venta
0,52,4,367.21,Electru00f3nica,2025-07-08,Este,1468.84
1,93,14,387.92,Deportes,2025-07-08,Centro,5430.88
2,15,18,46.28,Juguetes,2025-07-08,Este,833.04
3,72,9,185.65,Electru00f3nica,2025-07-08,Oeste,1670.85
4,61,2,66.78,Hogar,2025-07-08,Oeste,133.56


In [3]:
# Guardar en un archivo CSV
csv_path = '../data/raw/ventas_ejemplo.csv'

# Asegurar que el directorio existe
os.makedirs(os.path.dirname(csv_path), exist_ok=True)

# Guardar el DataFrame
df.to_csv(csv_path, index=False)

print(f"Datos guardados en {csv_path}")

Datos guardados en ../data/raw/ventas_ejemplo.csv


In [4]:
# Extraer datos del CSV utilizando nuestra funciu00f3n extract_from_csv
num_records = extract_from_csv(csv_path, 'ventas_csv')

print(f"Se extrajeron {num_records} registros del archivo CSV.")

Extracting data from CSV file... prueba de volumen
Se extrajeron 50 registros del archivo CSV.


Tambiu00e9n podemos simular la extracciu00f3n desde una API (aunque en este caso crearemos datos ficticios):

In [7]:
# Simular datos de una API
# En un caso real, usaru00edamos extract_from_api con una URL real

# Función para convertir tipos NumPy a tipos nativos de Python
def numpy_to_python(obj):
    """Convierte tipos de NumPy a tipos nativos de Python para serialización JSON"""
    if isinstance(obj, (np.integer)):
        return int(obj)
    elif isinstance(obj, (np.floating)):
        return float(obj)
    elif isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, pd.Series):
        return obj.to_dict()
    else:
        return obj

# Crear datos simulados de una API
api_data = {
    'resumen_ventas': {
        'total_ventas': df['total_venta'].sum(),
        'promedio_venta': df['total_venta'].mean(),
        'max_venta': df['total_venta'].max(),
        'min_venta': df['total_venta'].min(),
        'total_productos': df['cantidad'].sum(),
        'fecha_reporte': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    },
    'ventas_por_categoria': df.groupby('categoria')['total_venta'].sum().to_dict(),
    'ventas_por_region': df.groupby('region')['total_venta'].sum().to_dict()
}

# En un caso real, estos datos vendru00edan de una API
# Pero aquu00ed los insertamos directamente
with engine.connect() as conn:
    conn.execute(
        text("INSERT INTO raw_data (timestamp, source, data) VALUES (:timestamp, :source, :data)"),
        {
            'timestamp': datetime.now().isoformat(),
            'source': 'api_simulada',
            'data': json.dumps(api_data, default=numpy_to_python)
        }
    )

print("Datos de API simulada insertados en raw_data.")

Datos de API simulada insertados en raw_data.


Verificamos los datos extraidos en la tabla raw_data:

In [None]:
# Consultar datos de raw_data
query = text("SELECT id, timestamp, source FROM raw_data")

with engine.connect() as conn:
    raw_data_summary = pd.read_sql(query, conn)

raw_data_summary

Unnamed: 0,id,timestamp,source
0,1,2025-05-18 23:51:50.303212+00:00,ejemplo_notebook
1,2,2025-05-18 23:51:50.323238+00:00,ejemplo_notebook
2,3,2025-05-18 23:51:50.325509+00:00,ejemplo_notebook
3,4,2025-05-18 23:51:50.328453+00:00,ejemplo_notebook
4,5,2025-05-18 23:51:50.330390+00:00,ejemplo_notebook
...,...,...,...
137,138,2025-07-08 00:01:20.424112+00:00,ventas_csv
138,139,2025-07-08 00:01:20.424149+00:00,ventas_csv
139,140,2025-07-08 00:01:20.424186+00:00,ventas_csv
140,141,2025-07-08 00:01:20.424223+00:00,ventas_csv


## 2. Transformaciu00f3n (Transform)

Ahora vamos a transformar los datos utilizando nuestra funciu00f3n transform_raw_data.

In [None]:
# Transformar todos los datos no procesados
num_transformed = transform_raw_data()

print(f"Se transformaron {num_transformed} registros.")

Error al transformar datos: the JSON object must be str, bytes or bytearray, not dict
Se transformaron 0 registros.


In [None]:
# Consultar datos transformados
query = text("""
    SELECT p.id, r.source, p.processed_at
    FROM processed_data p
    JOIN raw_data r ON p.raw_data_id = r.id
""")

with engine.connect() as conn:
    processed_data_summary = pd.read_sql(query, conn)

processed_data_summary

Examinemos los datos transformados con mu00e1s detalle:

In [None]:
# Consultar datos de ventas transformados
query = text("""
    SELECT p.id, r.source, p.data
    FROM processed_data p
    JOIN raw_data r ON p.raw_data_id = r.id
    WHERE r.source = 'ventas_csv'
    LIMIT 5
""")

with engine.connect() as conn:
    ventas_processed = pd.read_sql(query, conn)

# Extraer JSON para visualizaciu00f3n
for i, row in ventas_processed.iterrows():
    data = json.loads(row['data'])
    print(f"Registro {i+1}:{json.dumps(data, indent=2)}")

## 3. Carga (Load)

Finalmente, vamos a cargar los datos transformados en la tabla final_data y generar insights.

In [None]:
# Cargar datos transformados en la tabla final
num_loaded = load_to_final_table()

print(f"Se cargaron {num_loaded} registros en la tabla final.")

In [None]:
# Consultar datos finales
query = text("""
    SELECT f.id, r.source, f.metrics, f.insights
    FROM final_data f
    JOIN processed_data p ON f.processed_data_id = p.id
    JOIN raw_data r ON p.raw_data_id = r.id
""")

with engine.connect() as conn:
    final_data = pd.read_sql(query, conn)

final_data

Vamos a analizar los datos de ventas finales con mu00e1s detalle:

In [None]:
# Extraer mu00e9tricas de ventas
ventas_metrics = []

for _, row in final_data.iterrows():
    if row['source'] == 'ventas_csv':
        metrics = json.loads(row['metrics'])
        metrics['id'] = row['id']
        ventas_metrics.append(metrics)

# Convertir a DataFrame para anu00e1lisis
if ventas_metrics:
    ventas_df = pd.DataFrame(ventas_metrics)
    
    # Mostrar estadu00edsticas descriptivas
    print("Estadu00edsticas descriptivas de las ventas:")
    print(ventas_df.describe())
    
    # Visualizar datos
    if 'categoria' in ventas_df.columns and 'total_venta' in ventas_df.columns:
        plt.figure(figsize=(12, 6))
        
        # Ventas por categoru00eda
        ventas_por_categoria = ventas_df.groupby('categoria')['total_venta'].sum().sort_values(ascending=False)
        
        plt.bar(ventas_por_categoria.index, ventas_por_categoria.values)
        plt.title('Ventas Totales por Categoru00eda')
        plt.xlabel('Categoru00eda')
        plt.ylabel('Ventas Totales')
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()
else:
    print("No hay datos de ventas disponibles para analizar.")

## 4. Resumen del Proceso ETL

Vamos a crear un resumen del proceso ETL completo:

In [None]:
# Consultar conteos de cada tabla
with engine.connect() as conn:
    raw_count = conn.execute(text("SELECT COUNT(*) FROM raw_data")).scalar()
    processed_count = conn.execute(text("SELECT COUNT(*) FROM processed_data")).scalar()
    final_count = conn.execute(text("SELECT COUNT(*) FROM final_data")).scalar()

# Crear DataFrame de resumen
summary_data = {
    'Etapa': ['Extracciu00f3n (raw_data)', 'Transformaciu00f3n (processed_data)', 'Carga (final_data)'],
    'Registros': [raw_count, processed_count, final_count]
}

summary_df = pd.DataFrame(summary_data)

# Mostrar resumen
print("Resumen del Proceso ETL:")
print(summary_df)

# Visualizar resumen
plt.figure(figsize=(10, 6))
plt.bar(summary_df['Etapa'], summary_df['Registros'])
plt.title('Registros por Etapa del Proceso ETL')
plt.xlabel('Etapa')
plt.ylabel('Nu00famero de Registros')
plt.xticks(rotation=15)
plt.tight_layout()
plt.show()

## Conclusiu00f3n

En este notebook, hemos implementado un proceso ETL completo utilizando nuestro framework de ingenieru00eda de datos:

1. **Extracciu00f3n (Extract)**: Extrajimos datos de un archivo CSV y simulamos la extracciu00f3n desde una API.

2. **Transformaciu00f3n (Transform)**: Transformamos los datos crudos aplicando limpieza y normalizaciu00f3n.

3. **Carga (Load)**: Cargamos los datos transformados en la tabla final y generamos insights.

Este flujo de trabajo puede ser adaptado para diferentes fuentes de datos y requisitos de procesamiento.