# Pipeline ETL Simple

Este notebook demuestra c√≥mo construir un pipeline ETL completo usando Python y Pandas.

**Referencia:** [Fundamentos Python](../fundamentos/fundamentos-python.md)

**Objetivos:**
- Extraer datos de CSV
- Transformar y limpiar datos
- Cargar a base de datos (opcional)
- Visualizar resultados

## 1. Importar librer√≠as

In [None]:
import os
import pandas as pd
import numpy as np
from datetime import datetime
import matplotlib.pyplot as plt
from dotenv import load_dotenv
from pathlib import Path

# Cargar variables de entorno desde .env en la ra√≠z del proyecto
env_path = Path().resolve().parent.parent / '.env'
if env_path.exists():
    load_dotenv(env_path)
    print("‚úÖ Variables de entorno cargadas desde .env")
else:
    print("üí° No se encontr√≥ .env. Usando valores por defecto.")
    print("   Crea .env desde .env.example para configurar rutas y credenciales")

print("‚úÖ Librer√≠as importadas")

## 2. Extract: Cargar datos

In [None]:
# Extract: Cargar datos
# En producci√≥n, cargar√≠as desde archivo usando rutas del .env

# Obtener rutas desde variables de entorno o usar valores por defecto
data_source = os.getenv('DATA_SOURCE_PATH', './data/raw')
data_output = os.getenv('DATA_OUTPUT_PATH', './data/processed')

print(f"üìÅ Ruta de datos fuente: {data_source}")
print(f"üìÅ Ruta de datos procesados: {data_output}")

# Para este ejemplo, creamos datos de ejemplo
# En producci√≥n, usar√≠as: df = pd.read_csv(os.path.join(data_source, 'ventas.csv'))
data = {
    'fecha': ['2024-01-15', '2024-01-16', '2024-01-17', '2024-01-18', '2024-01-19'],
    'producto': ['Producto A', 'Producto B', 'Producto A', 'Producto C', 'Producto B'],
    'cantidad': [5, 3, 2, 1, 4],
    'precio': [10.50, 25.00, 10.50, 50.00, 25.00],
    'cliente': ['Cliente 1', 'Cliente 2', 'Cliente 1', 'Cliente 3', 'Cliente 2']
}

df = pd.DataFrame(data)

print(f"\n‚úÖ Extra√≠dos {len(df)} registros")
print("\nPrimeras filas:")
df.head()

## 3. Transform: Limpiar y transformar

In [None]:
# Copiar para no modificar original
df_clean = df.copy()

# Convertir fecha a datetime
df_clean['fecha'] = pd.to_datetime(df_clean['fecha'])

# Calcular total
df_clean['total'] = df_clean['cantidad'] * df_clean['precio']

# Eliminar duplicados
df_clean = df_clean.drop_duplicates()

# Eliminar filas con valores nulos cr√≠ticos
df_clean = df_clean.dropna(subset=['fecha', 'producto', 'cantidad', 'precio'])

# Validar que cantidad y precio sean positivos
df_clean = df_clean[
    (df_clean['cantidad'] > 0) & 
    (df_clean['precio'] > 0)
]

print(f"‚úÖ Transformados {len(df_clean)} registros")
print("\nDatos transformados:")
df_clean.head()

## 4. Validar transformaciones

In [None]:
print("=== VALIDACIONES ===")
print(f"Registros originales: {len(df)}")
print(f"Registros despu√©s de transformaci√≥n: {len(df_clean)}")
print(f"Registros eliminados: {len(df) - len(df_clean)}")
print(f"\nTotal calculado: ‚Ç¨{df_clean['total'].sum():,.2f}")
print(f"Promedio por transacci√≥n: ‚Ç¨{df_clean['total'].mean():,.2f}")

## 5. Load: Visualizar resultados (simulado)

In [None]:
# Load: Guardar datos procesados
# Usar rutas desde variables de entorno

# Crear directorio de salida si no existe
os.makedirs(data_output, exist_ok=True)

# Guardar como Parquet (formato recomendado para Data Engineering)
output_file = os.path.join(data_output, 'ventas_procesadas.parquet')
df_clean.to_parquet(output_file, index=False)

print(f"‚úÖ Datos guardados en: {output_file}")
print(f"\nüí° En producci√≥n, tambi√©n podr√≠as cargar a:")
print("  - PostgreSQL/MySQL (usando DB_HOST, DB_NAME del .env)")
print("  - Data Warehouse")
print("  - API (usando API_URL, API_KEY del .env)")
print("  - S3 (usando AWS_ACCESS_KEY_ID, S3_BUCKET_NAME del .env)")

## 6. Visualizar resultados

In [None]:
# Ventas por producto
plt.figure(figsize=(10, 6))
ventas_producto = df_clean.groupby('producto')['total'].sum().sort_values(ascending=True)
ventas_producto.plot(kind='barh', color='steelblue')
plt.title('Ventas Totales por Producto')
plt.xlabel('Ventas Totales (‚Ç¨)')
plt.tight_layout()
plt.show()

In [None]:
# Resumen del pipeline
print("=" * 60)
print("RESUMEN DEL PIPELINE ETL")
print("=" * 60)
print(f"\n‚úÖ Extract: {len(df)} registros extra√≠dos")
print(f"‚úÖ Transform: {len(df_clean)} registros transformados")
print(f"‚úÖ Total procesado: ‚Ç¨{df_clean['total'].sum():,.2f}")
print(f"‚úÖ Promedio: ‚Ç¨{df_clean['total'].mean():,.2f}")
print("=" * 60)

---

**Pr√≥ximo paso:** Revisa otros notebooks:
- [Exploraci√≥n de Datos](01-exploracion-datos.ipynb)
- [Storytelling con Datos](02-storytelling-datos.ipynb)
- [Limpieza de Datos](04-limpieza-datos.ipynb)