In [None]:
def crear_banner(titulo, icono):
    """Crea un banner estilizado para secciones."""
    print("-" * 60)
    print(f"{icono} {titulo}")
    print("-" * 60)
    print()


# ============================================================================
# MANEJO DE BASES DE DATOS Y APIS
# ============================================================================

crear_banner("CONEXIÓN A BASES DE DATOS Y APIS", "*")

print(" **En esta sección aprenderemos:**")
print("   • Conectar a bases de datos SQLite")
print("   • Realizar operaciones CRUD")
print("   • Consumir APIs REST")
print("   • Automatizar la sincronización de datos")
print()

# ============================================================================
# PARTE 1: TRABAJANDO CON BASES DE DATOS
# ============================================================================

print(" **PARTE 1: Base de datos SQLite**")
print("SQLite es perfecto para aprender porque no necesita instalación")
print()

import sqlite3
import pandas as pd
import requests
import json
from datetime import datetime, timedelta

class GestorBaseDatos:
    """Clase para manejar operaciones de base de datos"""

    def __init__(self, nombre_db='empresa.db'):
        self.nombre_db = nombre_db
        self.conexion = None

    def conectar(self):
        """Establecer conexión con la base de datos"""
        try:
            self.conexion = sqlite3.connect(self.nombre_db)
            print(f" Conectado a la base de datos: {self.nombre_db}")
            return True
        except Exception as e:
            print(f" Error conectando a la base de datos: {e}")
            return False

    def crear_tablas(self):
        """Crear tablas necesarias"""
        try:
            cursor = self.conexion.cursor()

            # Tabla de productos
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS productos (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    nombre TEXT NOT NULL,
                    categoria TEXT,
                    precio REAL,
                    stock INTEGER,
                    fecha_creacion TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')

            # Tabla de ventas
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS ventas (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    producto_id INTEGER,
                    cantidad INTEGER,
                    precio_unitario REAL,
                    total REAL,
                    fecha_venta TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    FOREIGN KEY (producto_id) REFERENCES productos (id)
                )
            ''')

            # Tabla de clientes
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS clientes (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    nombre TEXT NOT NULL,
                    email TEXT UNIQUE,
                    telefono TEXT,
                    ciudad TEXT,
                    fecha_registro TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')

            self.conexion.commit()
            print(" Tablas creadas exitosamente")

        except Exception as e:
            print(f" Error creando tablas: {e}")

    def insertar_datos_ejemplo(self):
        """Insertar datos de ejemplo"""
        try:
            cursor = self.conexion.cursor()

            # Insertar productos
            productos = [
                ('Laptop Dell', 'Computadoras', 1200.00, 10),
                ('Mouse Logitech', 'Accesorios', 25.99, 50),
                ('Teclado Mecánico', 'Accesorios', 89.99, 30),
                ('Monitor 24"', 'Computadoras', 299.99, 15),
                ('Tablet Samsung', 'Computadoras', 450.00, 20)
            ]

            cursor.executemany(
                'INSERT INTO productos (nombre, categoria, precio, stock) VALUES (?, ?, ?, ?)',
                productos
            )

            # Insertar clientes
            clientes = [
                ('Ana García', 'ana@email.com', '555-0101', 'Madrid'),
                ('Luis Martín', 'luis@email.com', '555-0102', 'Barcelona'),
                ('María López', 'maria@email.com', '555-0103', 'Valencia'),
                ('Carlos Ruiz', 'carlos@email.com', '555-0104', 'Sevilla')
            ]

            cursor.executemany(
                'INSERT INTO clientes (nombre, email, telefono, ciudad) VALUES (?, ?, ?, ?)',
                clientes
            )

            # Insertar ventas
            import random
            ventas = []
            for _ in range(20):
                producto_id = random.randint(1, 5)
                cantidad = random.randint(1, 3)
                # Obtener precio del producto
                cursor.execute('SELECT precio FROM productos WHERE id = ?', (producto_id,))
                precio = cursor.fetchone()[0]
                total = precio * cantidad
                ventas.append((producto_id, cantidad, precio, total))

            cursor.executemany(
                'INSERT INTO ventas (producto_id, cantidad, precio_unitario, total) VALUES (?, ?, ?, ?)',
                ventas
            )

            self.conexion.commit()
            print(" Datos de ejemplo insertados")

        except Exception as e:
            print(f" Error insertando datos: {e}")

    def consultar_datos(self, query, params=None):
        """Ejecutar consulta y devolver DataFrame"""
        try:
            if params:
                df = pd.read_sql_query(query, self.conexion, params=params)
            else:
                df = pd.read_sql_query(query, self.conexion)
            return df
        except Exception as e:
            print(f" Error en consulta: {e}")
            return None

    def cerrar_conexion(self):
        """Cerrar conexión"""
        if self.conexion:
            self.conexion.close()
            print(" Conexión cerrada")

# Demostración de base de datos
print(" **Creando y poblando base de datos:**")

db = GestorBaseDatos()
if db.conectar():
    db.crear_tablas()
    db.insertar_datos_ejemplo()

    # Realizar algunas consultas
    print("\n **Consultas de ejemplo:**")

    # Consulta 1: Productos más vendidos
    query1 = '''
        SELECT p.nombre, SUM(v.cantidad) as total_vendido, SUM(v.total) as ingresos
        FROM productos p
        JOIN ventas v ON p.id = v.producto_id
        GROUP BY p.id, p.nombre
        ORDER BY total_vendido DESC
    '''

    resultado1 = db.consultar_datos(query1)
    print(" **Productos más vendidos:**")
    print(resultado1)

    # Consulta 2: Ventas por categoría
    query2 = '''
        SELECT p.categoria, COUNT(v.id) as num_ventas, SUM(v.total) as ingresos_totales
        FROM productos p
        JOIN ventas v ON p.id = v.producto_id
        GROUP BY p.categoria
    '''

    resultado2 = db.consultar_datos(query2)
    print("\n **Ventas por categoría:**")
    print(resultado2)

    db.cerrar_conexion()

# ============================================================================
# PARTE 2: CONSUMO DE APIS
# ============================================================================

print("\n **PARTE 2: Consumiendo APIs REST**")
print("Vamos a obtener datos de APIs públicas y procesarlos")
print()

class ConsumidorAPI:
    """Clase para consumir APIs REST"""

    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'ETL-Python-Script/1.0'
        })

    def obtener_datos_clima(self, ciudad='Madrid'):
        """Obtener datos del clima (API gratuita)"""
        try:
            # Usando OpenWeatherMap API (necesitarías una API key real)
            # Para la demo, simularemos los datos
            print(f" Simulando obtención de datos del clima para {ciudad}...")

            # Datos simulados
            datos_clima = {
                'ciudad': ciudad,
                'temperatura': 22.5,
                'humedad': 65,
                'descripcion': 'Parcialmente nublado',
                'fecha': datetime.now().isoformat()
            }

            return datos_clima

        except Exception as e:
            print(f" Error obteniendo datos del clima: {e}")
            return None

    def obtener_datos_financieros(self):
        """Obtener datos financieros simulados"""
        try:
            print(" Simulando obtención de datos financieros...")

            # Simular datos de acciones
            import random
            acciones = ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA']
            datos_financieros = []

            for accion in acciones:
                precio = round(random.uniform(100, 300), 2)
                cambio = round(random.uniform(-5, 5), 2)
                datos_financieros.append({
                    'simbolo': accion,
                    'precio': precio,
                    'cambio': cambio,
                    'porcentaje_cambio': round((cambio/precio)*100, 2),
                    'fecha': datetime.now().isoformat()
                })

            return datos_financieros

        except Exception as e:
            print(f" Error obteniendo datos financieros: {e}")
            return None

    def procesar_y_guardar_datos_api(self):
        """Procesar datos de API y guardarlos"""
        try:
            # Obtener datos del clima
            clima = self.obtener_datos_clima()
            if clima:
                df_clima = pd.DataFrame([clima])
                df_clima.to_csv('datos_clima.csv', index=False)
                print(" Datos del clima guardados en datos_clima.csv")

            # Obtener datos financieros
            finanzas = self.obtener_datos_financieros()
            if finanzas:
                df_finanzas = pd.DataFrame(finanzas)
                df_finanzas.to_csv('datos_financieros.csv', index=False)
                print(" Datos financieros guardados en datos_financieros.csv")

                # Mostrar resumen
                print("\n **Resumen de datos financieros:**")
                print(df_finanzas[['simbolo', 'precio', 'cambio', 'porcentaje_cambio']])

        except Exception as e:
            print(f" Error procesando datos de API: {e}")

# Demostración de APIs
print(" **Consumiendo APIs y procesando datos:**")
api = ConsumidorAPI()
api.procesar_y_guardar_datos_api()

# ============================================================================
# PARTE 3: ETL COMPLETO CON MÚLTIPLES FUENTES
# ============================================================================

print("\n **PARTE 3: ETL completo combinando BD y APIs**")

class ETLCompleto:
    """ETL que combina base de datos y APIs"""

    def __init__(self):
        self.db = GestorBaseDatos('etl_completo.db')
        self.api = ConsumidorAPI()

    def ejecutar_etl_completo(self):
        """Ejecutar ETL completo"""
        print(" Iniciando ETL completo...")

        # 1. EXTRACT: Obtener datos de múltiples fuentes
        print("\n EXTRACT: Obteniendo datos...")

        # Conectar a BD
        if not self.db.conectar():
            return False

        self.db.crear_tablas()
        self.db.insertar_datos_ejemplo()

        # Obtener datos de BD
        ventas_bd = self.db.consultar_datos('''
            SELECT p.nombre as producto, p.categoria, v.cantidad, v.total, v.fecha_venta
            FROM productos p
            JOIN ventas v ON p.id = v.producto_id
        ''')

        # Obtener datos de APIs
        datos_clima = self.api.obtener_datos_clima()
        datos_financieros = self.api.obtener_datos_financieros()

        # 2. TRANSFORM: Procesar y combinar datos
        print("\n TRANSFORM: Procesando datos...")

        # Procesar ventas
        ventas_bd['fecha_venta'] = pd.to_datetime(ventas_bd['fecha_venta'])
        ventas_bd['mes'] = ventas_bd['fecha_venta'].dt.month
        ventas_bd['dia_semana'] = ventas_bd['fecha_venta'].dt.day_name()

        # Crear resumen de ventas
        resumen_ventas = ventas_bd.groupby(['categoria', 'mes']).agg({
            'cantidad': 'sum',
            'total': 'sum'
        }).reset_index()

        # Agregar datos externos
        if datos_clima:
            resumen_ventas['temperatura_actual'] = datos_clima['temperatura']
            resumen_ventas['fecha_clima'] = datos_clima['fecha']

        # 3. LOAD: Guardar datos procesados
        print("\n LOAD: Guardando datos procesados...")

        # Guardar en múltiples formatos
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

        # Excel con múltiples hojas
        with pd.ExcelWriter(f'reporte_etl_completo_{timestamp}.xlsx') as writer:
            ventas_bd.to_excel(writer, sheet_name='Ventas_Detalle', index=False)
            resumen_ventas.to_excel(writer, sheet_name='Resumen_Ventas', index=False)

            if datos_financieros:
                pd.DataFrame(datos_financieros).to_excel(writer, sheet_name='Datos_Financieros', index=False)

        # CSV para análisis
        resumen_ventas.to_csv(f'resumen_ventas_{timestamp}.csv', index=False)

        # JSON para APIs
        reporte_json = {
            'fecha_generacion': datetime.now().isoformat(),
            'total_ventas': float(ventas_bd['total'].sum()),
            'num_transacciones': len(ventas_bd),
            'categorias': resumen_ventas.groupby('categoria')['total'].sum().to_dict(),
            'datos_clima': datos_clima,
            'datos_financieros': datos_financieros[:3] if datos_financieros else None
        }

        with open(f'reporte_completo_{timestamp}.json', 'w') as f:
            json.dump(reporte_json, f, indent=2, default=str)

        self.db.cerrar_conexion()

        print(" ETL completo finalizado exitosamente!")
        print(f" Total de ventas procesadas: ${ventas_bd['total'].sum():,.2f}")
        print(f" Archivos generados:")
        print(f"    reporte_etl_completo_{timestamp}.xlsx")
        print(f"    resumen_ventas_{timestamp}.csv")
        print(f"    reporte_completo_{timestamp}.json")

        return True

# Ejecutar ETL completo
print(" **Ejecutando ETL completo con múltiples fuentes:**")
etl = ETLCompleto()
etl.ejecutar_etl_completo()