# Paso 1: Configuración del Entorno

In [0]:
%pip install beautifulsoup4 requests pandas

Python interpreter will be restarted.
Python interpreter will be restarted.


# Importar librerías

In [0]:
import pandas as pd
import requests
from bs4 import BeautifulSoup
import datetime



# Paso 2: Clase DataWeb

### Clase especializada para extraer datos financieros de Yahoo Finance

In [0]:
class DataWeb:
    """
    Clase especializada para extraer datos financieros de Yahoo Finance
    Optimizada para Databricks Community Edition
    """
    
    def __init__(self, listado_indicadores=[]):
        self.listado_indicadores = listado_indicadores
        self.url_historicos = "https://es.finance.yahoo.com/quote/{}/history/"
        self.url_perfil = "https://es.finance.yahoo.com/quote/{}/"
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8',
            'Accept-Encoding': 'gzip, deflate',
            'Connection': 'keep-alive',
        }
    
    def obtener_datos(self, indicador=""):
        """
        Extrae datos históricos de un indicador financiero
        """
        try:
            url = self.url_historicos.format(indicador)
            print(f"🔍 Extrayendo datos de: {indicador}")
            
            respuesta = requests.get(url, headers=self.headers, timeout=30)
            if respuesta.status_code != 200:
                print(f"❌ Error HTTP {respuesta.status_code} para {indicador}")
                return pd.DataFrame()
            
            soup = BeautifulSoup(respuesta.text, 'html.parser')
            tabla = soup.select_one('div[data-testid="history-table"] table')
            
            if not tabla:
                print(f"❌ No se encontró tabla de datos para {indicador}")
                return pd.DataFrame()
            
            # Extraer nombres de columnas
            nombre_columnas = [th.get_text(strip=True) for th in tabla.thead.find_all('th')]
            
            # Extraer filas de datos
            filas = []
            for tr in tabla.tbody.find_all('tr'):
                columnas = [td.get_text(strip=True) for td in tr.find_all('td')]
                if len(columnas) == len(nombre_columnas):
                    filas.append(columnas)
            
            # Crear DataFrame
            df = pd.DataFrame(filas, columns=nombre_columnas)
            
            # Normalizar nombres de columnas
            df = df.rename(columns=self._normalizar_columnas())
            
            # Convertir tipos de datos
            df = self.convertir_numericos(df)
            
            # Agregar metadatos
            df["cod_indicador"] = indicador
            df["fecha_extraccion"] = datetime.datetime.now()
            
            print(f"✅ Extraídos {len(df)} registros para {indicador}")
            return df
            
        except Exception as err:
            print(f"❌ Error en obtener_datos para {indicador}: {str(err)}")
            return pd.DataFrame()
    
    def obtener_metadatos(self, indicador=""):
        """
        Extrae metadatos del indicador (nombre, mercado, etc.)
        """
        try:
            url = self.url_perfil.format(indicador)
            respuesta = requests.get(url, headers=self.headers, timeout=30)
            
            if respuesta.status_code != 200:
                return self._metadatos_default(indicador)
            
            soup = BeautifulSoup(respuesta.text, 'html.parser')
            
            # Extraer información básica
            nombre = self._extraer_nombre(soup, indicador)
            moneda = self._extraer_moneda(soup)
            mercado = self._extraer_mercado(soup)
            clasificacion = self._inferir_clasificacion(indicador)
            pais = self._inferir_pais(indicador, mercado)
            
            metadatos = {
                'cod_indicador': indicador,
                'nombre': nombre,
                'pais': pais,
                'clasificacion': clasificacion,
                'moneda': moneda,
                'mercado': mercado,
                'fecha_actualizacion': datetime.datetime.now(),
                'activo': True
            }
            
            print(f"📋 Metadatos extraídos para {indicador}: {nombre}")
            return metadatos
            
        except Exception as err:
            print(f"⚠️ Error extrayendo metadatos para {indicador}: {str(err)}")
            return self._metadatos_default(indicador)
    
    def convertir_numericos(self, df=pd.DataFrame()):
        """
        Convierte columnas numéricas al formato correcto
        """
        df = df.copy()
        if len(df) > 0:
            columnas_numericas = ['abrir', 'max', 'min', 'cerrar', 'cierre_ajustado', 'volumen']
            for col in columnas_numericas:
                if col in df.columns:
                    df[col] = (df[col]
                              .str.replace(r"[^\d,.-]", "", regex=True)  # Eliminar caracteres no numéricos
                              .str.replace(r"\.(?=.*\.)", "", regex=True)  # Eliminar puntos excepto el decimal
                              .str.replace(",", ".")  # Convertir coma decimal a punto
                              .replace(["-", ""], "0")  # Reemplazar guiones y vacíos con 0
                              )
                    # Convertir a numérico
                    df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
        return df
    
    def _normalizar_columnas(self):
        """
        Mapeo de nombres de columnas en español a nombres normalizados
        """
        return {
            'Fecha': 'fecha',
            'Abrir': 'abrir',
            'Máx.': 'max',
            'Mín.': 'min',
            'Cerrar': 'cerrar',
            'Cierre ajustado': 'cierre_ajustado',
            'Volumen': 'volumen'
        }
    
    def _extraer_nombre(self, soup, indicador):
        """Extrae el nombre del instrumento"""
        try:
            # Buscar diferentes selectores para el nombre
            selectores = [
                'h1[data-testid="quote-header"]',
                'h1.yf-xxbei9',
                '.yf-xxbei9',
                'h1'
            ]
            
            for selector in selectores:
                elemento = soup.select_one(selector)
                if elemento:
                    nombre = elemento.get_text(strip=True)
                    if nombre and len(nombre) > 1:
                        return nombre[:100]  # Limitar longitud
            
            return f"Instrumento {indicador}"
        except:
            return f"Instrumento {indicador}"
    
    def _extraer_moneda(self, soup):
        """Extrae la moneda del instrumento"""
        try:
            # Buscar indicadores de moneda
            texto = soup.get_text().upper()
            if 'USD' in texto:
                return 'USD'
            elif 'EUR' in texto:
                return 'EUR'
            elif 'GBP' in texto:
                return 'GBP'
            elif 'JPY' in texto:
                return 'JPY'
            else:
                return 'USD'  # Por defecto
        except:
            return 'USD'
    
    def _extraer_mercado(self, soup):
        """Extrae información del mercado"""
        try:
            # Buscar información de mercado
            mercado_elem = soup.select_one('.exchange')
            if mercado_elem:
                return mercado_elem.get_text(strip=True)[:50]
            return 'N/A'
        except:
            return 'N/A'
    
    def _inferir_clasificacion(self, indicador):
        """Infiere la clasificación del instrumento basado en el código"""
        indicador_upper = indicador.upper()
        
        if '^' in indicador or 'IBEX' in indicador_upper or 'STOXX' in indicador_upper:
            return 'INDICE'
        elif '=F' in indicador or 'CL=' in indicador or 'GC=' in indicador:
            return 'COMMODITY'
        elif '=X' in indicador or 'USD' in indicador_upper or 'EUR' in indicador_upper:
            return 'DIVISA'
        elif 'BTC' in indicador_upper or 'ETH' in indicador_upper:
            return 'CRYPTO'
        elif '.MC' in indicador:
            return 'ACCION_ES'
        else:
            return 'ACCION'
    
    def _inferir_pais(self, indicador, mercado):
        """Infiere el país basado en el código y mercado"""
        if '.MC' in indicador or 'IBEX' in indicador.upper():
            return 'España'
        elif 'STOXX' in indicador.upper():
            return 'Europa'
        elif mercado and ('NYSE' in mercado.upper() or 'NASDAQ' in mercado.upper()):
            return 'Estados Unidos'
        else:
            return 'Internacional'
    
    def _metadatos_default(self, indicador):
        """Retorna metadatos por defecto cuando no se pueden extraer"""
        return {
            'cod_indicador': indicador,
            'nombre': f'Instrumento {indicador}',
            'pais': 'N/A',
            'clasificacion': self._inferir_clasificacion(indicador),
            'moneda': 'USD',
            'mercado': 'N/A',
            'fecha_actualizacion': datetime.datetime.now(),
            'activo': True
        }

# Ejemplo de uso
print("🚀 Clase DataWeb mejorada lista para usar")

🚀 Clase DataWeb mejorada lista para usar


# Paso 3: Crear Esquema y Tablas Delta

In [0]:
%sql
DROP TABLE IF EXISTS finanzas_schema.indicador_parametros;
DROP TABLE IF EXISTS finanzas_schema.indicador_financiero;
DROP TABLE IF EXISTS finanzas_schema.indicador_demografico;


In [0]:
%sql
DROP SCHEMA IF EXISTS finanzas_schema CASCADE;

In [0]:
%sql
-- Crear esquema principal
CREATE SCHEMA IF NOT EXISTS finanzas_schema
COMMENT 'Esquema para datos financieros y análisis de mercados';

In [0]:
%sql
-- TABLA 1: PARÁMETROS DE INDICADORES
-- =================================================
DROP TABLE IF EXISTS finanzas_schema.indicador_parametros;



In [0]:
%sql
CREATE TABLE IF NOT EXISTS finanzas_schema.indicador_parametros (
    cod_indicador STRING NOT NULL,
    descripcion STRING,
    prioridad INT,
    patron_clasificacion STRING,
    activo BOOLEAN,
    fecha_creacion TIMESTAMP
)
USING DELTA
COMMENT 'Tabla de configuración y patrones para clasificación de instrumentos financieros'
TBLPROPERTIES (
  'delta.autoOptimize.optimizeWrite' = 'true',
  'delta.autoOptimize.autoCompact' = 'true'
);

In [0]:
%sql
select * from finanzas_schema.indicador_parametros ;

cod_indicador,descripcion,prioridad,patron_clasificacion,activo,fecha_creacion
^STOXX50E,Euro Stoxx 50 - Índice de las 50 principales empresas de la eurozona,1,INDICE_EUROPEO,True,2025-05-22T02:15:29.571+0000
^IBEX,IBEX 35 - Índice principal de la bolsa española,1,INDICE_ESPAÑOL,True,2025-05-22T02:15:29.571+0000
^FCHI,CAC 40 - Índice de la bolsa de París,1,INDICE_FRANCES,True,2025-05-22T02:15:29.571+0000
GC=F,Oro - Futuros del metal precioso,1,COMMODITY_METAL,True,2025-05-22T02:15:29.571+0000
CL=F,Petróleo WTI - Futuros de crudo,1,COMMODITY_ENERGIA,True,2025-05-22T02:15:29.571+0000
EURUSD=X,Euro / Dólar estadounidense,1,DIVISA_MAYOR,True,2025-05-22T02:15:29.571+0000
BTC-EUR,Bitcoin en euros,1,CRYPTO_MAYOR,True,2025-05-22T02:15:29.571+0000
SAN.MC,Banco Santander - Acción española,2,ACCION_ESPAÑOLA_BANCARIA,True,2025-05-22T02:15:29.571+0000
TEF.MC,Telefónica - Acción española de telecomunicaciones,2,ACCION_ESPAÑOLA_TELCO,True,2025-05-22T02:15:29.571+0000


In [0]:
%sql
SHOW CREATE TABLE finanzas_schema.indicador_parametros;

createtab_stmt
"CREATE TABLE spark_catalog.finanzas_schema.indicador_parametros (  cod_indicador STRING NOT NULL,  descripcion STRING,  prioridad INT,  patron_clasificacion STRING,  activo BOOLEAN,  fecha_creacion TIMESTAMP) USING delta COMMENT 'Tabla de configuración y patrones para clasificación de instrumentos financieros' TBLPROPERTIES (  'delta.autoOptimize.autoCompact' = 'true',  'delta.autoOptimize.optimizeWrite' = 'true',  'delta.minReaderVersion' = '1',  'delta.minWriterVersion' = '2')"


In [0]:
%sql
SHOW TABLES IN finanzas_schema;

database,tableName,isTemporary
finanzas_schema,indicador_demografico,False
finanzas_schema,indicador_financiero,False
finanzas_schema,indicador_parametros,False


In [0]:
%sql
-- TABLA 1: PARÁMETROS DE INDICADORES
CREATE  TABLE IF NOT EXISTS finanzas_schema.indicador_parametros (
    cod_indicador STRING,
    descripcion STRING,
    prioridad INT,
    patron_clasificacion STRING,
    activo BOOLEAN,
    fecha_creacion TIMESTAMP
)
USING DELTA
LOCATION 'dbfs:/delta/finanzas/indicador_parametros'
COMMENT 'Tabla de configuración y patrones para clasificación de instrumentos financieros'
TBLPROPERTIES (
  'delta.autoOptimize.optimizeWrite' = 'true',
  'delta.autoOptimize.autoCompact' = 'true'
);

In [0]:
%sql
-- TABLA 2: DATOS HISTÓRICOS FINANCIEROS
CREATE TABLE IF NOT EXISTS finanzas_schema.indicador_financiero (
    fecha STRING,
    abrir DOUBLE,
    max DOUBLE,
    min DOUBLE,
    cerrar DOUBLE,
    cierre_ajustado DOUBLE,
    volumen BIGINT,
    cod_indicador STRING,
    fecha_extraccion TIMESTAMP
)
USING DELTA
LOCATION 'dbfs:/delta/finanzas/indicador_financiero'
PARTITIONED BY (cod_indicador)
COMMENT 'Tabla principal con datos históricos de instrumentos financieros'
TBLPROPERTIES (
  'delta.autoOptimize.optimizeWrite' = 'true',
  'delta.autoOptimize.autoCompact' = 'true'
);

In [0]:
%sql
SHOW TABLES IN finanzas_schema;

database,tableName,isTemporary
finanzas_schema,indicador_demografico,False
finanzas_schema,indicador_financiero,False
finanzas_schema,indicador_parametros,False


In [0]:
%sql
-- TABLA 3: METADATOS DEMOGRÁFICOS
CREATE TABLE IF NOT EXISTS finanzas_schema.indicador_demografico (
    cod_indicador STRING,
    nombre STRING,
    pais STRING,
    clasificacion STRING,
    moneda STRING,
    mercado STRING,
    fecha_actualizacion TIMESTAMP,
    activo BOOLEAN
)
USING DELTA
LOCATION 'dbfs:/delta/finanzas/indicador_demografico'
COMMENT 'Tabla maestro de metadatos e información demográfica de instrumentos financieros'
TBLPROPERTIES (
  'delta.autoOptimize.optimizeWrite' = 'true',
  'delta.autoOptimize.autoCompact' = 'true'
);


In [0]:
%sql
SHOW TABLES IN finanzas_schema;

database,tableName,isTemporary
finanzas_schema,indicador_demografico,False
finanzas_schema,indicador_financiero,False
finanzas_schema,indicador_parametros,False


In [0]:
%sql
DESCRIBE EXTENDED finanzas_schema.indicador_parametros;

col_name,data_type,comment
cod_indicador,string,
descripcion,string,
prioridad,int,
patron_clasificacion,string,
activo,boolean,
fecha_creacion,timestamp,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,finanzas_schema,


In [0]:
%sql
DESCRIBE EXTENDED finanzas_schema.indicador_financiero;


col_name,data_type,comment
fecha,string,
abrir,double,
max,double,
min,double,
cerrar,double,
cierre_ajustado,double,
volumen,bigint,
cod_indicador,string,
fecha_extraccion,timestamp,
# Partition Information,,


In [0]:
%sql
DESCRIBE EXTENDED finanzas_schema.indicador_demografico;

col_name,data_type,comment
cod_indicador,string,
nombre,string,
pais,string,
clasificacion,string,
moneda,string,
mercado,string,
fecha_actualizacion,timestamp,
activo,boolean,
,,
# Detailed Table Information,,


### # Paso 4: Poblar Tabla de Parámetros

In [0]:
%sql
-- =================================================
-- POBLAR TABLA DE PARÁMETROS
-- =================================================
INSERT INTO finanzas_schema.indicador_parametros VALUES
-- ÍNDICES BURSÁTILES
('^STOXX50E', 'Euro Stoxx 50 - Índice de las 50 principales empresas de la eurozona', 1, 'INDICE_EUROPEO', true, current_timestamp()),
('^IBEX', 'IBEX 35 - Índice principal de la bolsa española', 1, 'INDICE_ESPAÑOL', true, current_timestamp()),
('^FCHI', 'CAC 40 - Índice de la bolsa de París', 1, 'INDICE_FRANCES', true, current_timestamp()),

-- COMMODITIES
('GC=F', 'Oro - Futuros del metal precioso', 1, 'COMMODITY_METAL', true, current_timestamp()),
('CL=F', 'Petróleo WTI - Futuros de crudo', 1, 'COMMODITY_ENERGIA', true, current_timestamp()),

-- DIVISAS
('EURUSD=X', 'Euro / Dólar estadounidense', 1, 'DIVISA_MAYOR', true, current_timestamp()),

-- CRIPTOMONEDAS
('BTC-EUR', 'Bitcoin en euros', 1, 'CRYPTO_MAYOR', true, current_timestamp()),

-- ACCIONES ESPAÑOLAS (muestra)
('SAN.MC', 'Banco Santander - Acción española', 2, 'ACCION_ESPAÑOLA_BANCARIA', true, current_timestamp()),
('TEF.MC', 'Telefónica - Acción española de telecomunicaciones', 2, 'ACCION_ESPAÑOLA_TELCO', true, current_timestamp());

num_affected_rows,num_inserted_rows
9,9


In [0]:
%sql
SELECT 'Parámetros insertados:' as info, COUNT(*) as cantidad 
FROM finanzas_schema.indicador_parametros;

-- Mostrar configuración
SELECT 
    cod_indicador, 
    descripcion, 
    patron_clasificacion, 
    prioridad,
    activo
FROM finanzas_schema.indicador_parametros 
ORDER BY prioridad, cod_indicador;

cod_indicador,descripcion,patron_clasificacion,prioridad,activo
BTC-EUR,Bitcoin en euros,CRYPTO_MAYOR,1,True
CL=F,Petróleo WTI - Futuros de crudo,COMMODITY_ENERGIA,1,True
EURUSD=X,Euro / Dólar estadounidense,DIVISA_MAYOR,1,True
GC=F,Oro - Futuros del metal precioso,COMMODITY_METAL,1,True
^FCHI,CAC 40 - Índice de la bolsa de París,INDICE_FRANCES,1,True
^IBEX,IBEX 35 - Índice principal de la bolsa española,INDICE_ESPAÑOL,1,True
^STOXX50E,Euro Stoxx 50 - Índice de las 50 principales empresas de la eurozona,INDICE_EUROPEO,1,True
SAN.MC,Banco Santander - Acción española,ACCION_ESPAÑOLA_BANCARIA,2,True
TEF.MC,Telefónica - Acción española de telecomunicaciones,ACCION_ESPAÑOLA_TELCO,2,True


# Paso 5: Pipeline de Extracción y Carga

In [0]:
# =================================================
# PASO 5: PIPELINE DE EXTRACCIÓN Y CARGA COMPLETO
# =================================================

import time
from pyspark.sql.functions import current_timestamp, lit, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, TimestampType, DoubleType, LongType

def pipeline_extraccion_completo():
    """
    Pipeline ETL completo para datos financieros
    Extrae, transforma y carga datos de Yahoo Finance a Delta Lake
    """
    
    print("🚀 INICIANDO PIPELINE ETL FINANCIERO")
    print("=" * 50)
    
    # VERIFICAR PRERREQUISITOS
    try:
        count_parametros = spark.sql("SELECT COUNT(*) as count FROM finanzas_schema.indicador_parametros").collect()[0]['count']
        print(f"✅ Tabla de parámetros verificada: {count_parametros} indicadores configurados")
        
        if count_parametros == 0:
            print("❌ La tabla de parámetros está vacía. Ejecuta primero el setup del esquema.")
            return None
            
    except Exception as e:
        print(f"❌ Error verificando prerrequisitos: {str(e)}")
        print("💡 Ejecuta primero: setup_completo() o el script SQL de creación de tablas")
        return None
    
    # OBTENER INDICADORES ACTIVOS
    print("\n📋 Obteniendo lista de indicadores activos...")
    indicadores_df = spark.sql("""
        SELECT cod_indicador, descripcion, prioridad, patron_clasificacion
        FROM finanzas_schema.indicador_parametros 
        WHERE activo = true 
        ORDER BY prioridad, cod_indicador
    """)
    
    print("📊 Indicadores a procesar:")
    indicadores_df.show(truncate=False)
    
    indicadores_list = [row['cod_indicador'] for row in indicadores_df.collect()]
    
    # INICIALIZAR EXTRACTOR DE DATOS
    print(f"\n⚡ Inicializando extractor de datos para {len(indicadores_list)} indicadores...")
    dataweb = DataWeb(indicadores_list)
    
    # CONTADORES PARA ESTADÍSTICAS
    total_registros = 0
    indicadores_exitosos = 0
    indicadores_fallidos = 0
    errores_detalle = []
    
    print(f"\n🔄 INICIANDO EXTRACCIÓN Y CARGA")
    print("=" * 40)
    
    # PROCESAR CADA INDICADOR
    for i, indicador in enumerate(indicadores_list, 1):
        print(f"\n📊 [{i}/{len(indicadores_list)}] Procesando: {indicador}")
        print("-" * 50)
        
        try:
            # PASO 1: EXTRAER DATOS HISTÓRICOS
            print(f"🔍 1. Extrayendo datos históricos de Yahoo Finance...")
            df_pandas = dataweb.obtener_datos(indicador)
            
            if df_pandas.empty:
                print(f"⚠️ No se obtuvieron datos históricos para {indicador}")
                indicadores_fallidos += 1
                errores_detalle.append(f"{indicador}: Sin datos históricos disponibles")
                continue
            
            print(f"✅ Extraídos {len(df_pandas)} registros históricos")
            
            # PASO 2: EXTRAER METADATOS
            print(f"📋 2. Extrayendo metadatos del instrumento...")
            metadatos = dataweb.obtener_metadatos(indicador)
            print(f"✅ Metadatos obtenidos: {metadatos['nombre']}")
            
            # PASO 3: TRANSFORMAR A SPARK DATAFRAME
            print(f"⚡ 3. Transformando a Spark DataFrame...")
            
            # Definir esquema explícito para datos históricos
            schema_financiero = StructType([
                StructField("fecha", StringType(), True),
                StructField("abrir", DoubleType(), True),
                StructField("max", DoubleType(), True),
                StructField("min", DoubleType(), True),
                StructField("cerrar", DoubleType(), True),
                StructField("cierre_ajustado", DoubleType(), True),
                StructField("volumen", LongType(), True),
                StructField("cod_indicador", StringType(), True),
                StructField("fecha_extraccion", TimestampType(), True)
            ])
            
            df_spark = spark.createDataFrame(df_pandas, schema=schema_financiero)
            print(f"✅ DataFrame Spark creado con {df_spark.count()} registros")
            
            # PASO 4: CARGAR DATOS HISTÓRICOS (UPSERT)
            print(f"💾 4. Cargando datos históricos a Delta Lake...")
            df_spark.createOrReplaceTempView("temp_financiero")
            
            merge_historicos = f"""
            MERGE INTO finanzas_schema.indicador_financiero AS target
            USING temp_financiero AS source
            ON target.fecha = source.fecha AND target.cod_indicador = source.cod_indicador
            WHEN MATCHED THEN
              UPDATE SET
                abrir = source.abrir,
                max = source.max,
                min = source.min,
                cerrar = source.cerrar,
                cierre_ajustado = source.cierre_ajustado,
                volumen = source.volumen,
                fecha_extraccion = source.fecha_extraccion
            WHEN NOT MATCHED THEN
              INSERT (fecha, abrir, max, min, cerrar, cierre_ajustado, volumen, cod_indicador, fecha_extraccion)
              VALUES (source.fecha, source.abrir, source.max, source.min, source.cerrar, 
                     source.cierre_ajustado, source.volumen, source.cod_indicador, source.fecha_extraccion)
            """
            
            spark.sql(merge_historicos)
            print(f"✅ Datos históricos cargados exitosamente")
            
            # PASO 5: CARGAR/ACTUALIZAR METADATOS
            print(f"📝 5. Actualizando metadatos demográficos...")
            
            # Crear DataFrame de metadatos con esquema explícito
            schema_metadatos = StructType([
                StructField("cod_indicador", StringType(), True),
                StructField("nombre", StringType(), True),
                StructField("pais", StringType(), True),
                StructField("clasificacion", StringType(), True),
                StructField("moneda", StringType(), True),
                StructField("mercado", StringType(), True),
                StructField("fecha_actualizacion", TimestampType(), True),
                StructField("activo", BooleanType(), True)
            ])
            
            df_metadatos = spark.createDataFrame([metadatos], schema=schema_metadatos)
            df_metadatos.createOrReplaceTempView("temp_metadatos")
            
            merge_metadatos = f"""
            MERGE INTO finanzas_schema.indicador_demografico AS target
            USING temp_metadatos AS source
            ON target.cod_indicador = source.cod_indicador
            WHEN MATCHED THEN
              UPDATE SET
                nombre = source.nombre,
                pais = source.pais,
                clasificacion = source.clasificacion,
                moneda = source.moneda,
                mercado = source.mercado,
                fecha_actualizacion = source.fecha_actualizacion,
                activo = source.activo
            WHEN NOT MATCHED THEN
              INSERT (cod_indicador, nombre, pais, clasificacion, moneda, mercado, fecha_actualizacion, activo)
              VALUES (source.cod_indicador, source.nombre, source.pais, source.clasificacion, 
                     source.moneda, source.mercado, source.fecha_actualizacion, source.activo)
            """
            
            spark.sql(merge_metadatos)
            print(f"✅ Metadatos actualizados exitosamente")
            
            # ACTUALIZAR CONTADORES
            total_registros += len(df_pandas)
            indicadores_exitosos += 1
            
            print(f"🎯 {indicador} procesado exitosamente: {len(df_pandas)} registros")
            
            # PAUSA PARA NO SOBRECARGAR EL SERVIDOR
            print(f"⏳ Pausa de 2 segundos...")
            time.sleep(2)
            
        except Exception as e:
            print(f"❌ Error procesando {indicador}: {str(e)}")
            indicadores_fallidos += 1
            errores_detalle.append(f"{indicador}: {str(e)[:100]}")
            
            # Continuar con el siguiente indicador
            continue
    
    # OPTIMIZACIÓN POST-CARGA
    print(f"\n🔧 OPTIMIZANDO TABLAS DELTA...")
    optimizar_tablas_delta()
    
    # ESTADÍSTICAS FINALES
    mostrar_estadisticas_finales(indicadores_exitosos, indicadores_fallidos, total_registros, errores_detalle)
    
    # ANÁLISIS DE CALIDAD
    analizar_calidad_datos()
    
    return {
        'indicadores_exitosos': indicadores_exitosos,
        'indicadores_fallidos': indicadores_fallidos,
        'total_registros': total_registros,
        'errores': errores_detalle
    }

def optimizar_tablas_delta():
    """
    Optimiza las tablas Delta después de la carga masiva
    """
    try:
        print("🔧 Optimizando tabla de datos históricos...")
        spark.sql("OPTIMIZE finanzas_schema.indicador_financiero")
        
        print("🔧 Optimizando tabla de metadatos...")
        spark.sql("OPTIMIZE finanzas_schema.indicador_demografico")
        
        print("🔧 Optimizando tabla de parámetros...")
        spark.sql("OPTIMIZE finanzas_schema.indicador_parametros")
        
        print("✅ Optimización Delta completada")
        
    except Exception as e:
        print(f"⚠️ Error en optimización: {str(e)}")

def mostrar_estadisticas_finales(exitosos, fallidos, total_registros, errores):
    """
    Muestra estadísticas detalladas del pipeline
    """
    print(f"\n🎯 ESTADÍSTICAS FINALES DEL PIPELINE")
    print("=" * 45)
    print(f"✅ Indicadores procesados exitosamente: {exitosos}")
    print(f"❌ Indicadores fallidos: {fallidos}")
    print(f"📊 Total registros históricos procesados: {total_registros:,}")
    print(f"📈 Promedio registros por indicador: {total_registros // max(exitosos, 1):,}")
    
    if errores:
        print(f"\n⚠️ ERRORES ENCONTRADOS:")
        for error in errores:
            print(f"   • {error}")
    
    # ESTADÍSTICAS POR TABLA
    print(f"\n📋 REGISTROS POR TABLA:")
    stats_query = """
    SELECT 'indicador_financiero' as tabla, COUNT(*) as registros, COUNT(DISTINCT cod_indicador) as indicadores
    FROM finanzas_schema.indicador_financiero
    UNION ALL
    SELECT 'indicador_demografico' as tabla, COUNT(*) as registros, COUNT(DISTINCT cod_indicador) as indicadores
    FROM finanzas_schema.indicador_demografico
    UNION ALL
    SELECT 'indicador_parametros' as tabla, COUNT(*) as registros, COUNT(DISTINCT cod_indicador) as indicadores
    FROM finanzas_schema.indicador_parametros
    """
    
    spark.sql(stats_query).show()

def analizar_calidad_datos():
    """
    Análisis detallado de calidad de los datos cargados
    """
    print(f"\n🔍 ANÁLISIS DE CALIDAD DE DATOS")
    print("=" * 35)
    
    # 1. COMPLETITUD POR INDICADOR
    print("📊 Completitud de datos por indicador:")
    spark.sql("""
    SELECT 
        if.cod_indicador,
        id.nombre,
        id.clasificacion,
        COUNT(*) as total_registros,
        SUM(CASE WHEN abrir IS NULL OR abrir = 0 THEN 1 ELSE 0 END) as nulos_abrir,
        SUM(CASE WHEN cerrar IS NULL OR cerrar = 0 THEN 1 ELSE 0 END) as nulos_cerrar,
        SUM(CASE WHEN volumen IS NULL OR volumen = 0 THEN 1 ELSE 0 END) as sin_volumen,
        MIN(fecha) as fecha_desde,
        MAX(fecha) as fecha_hasta
    FROM finanzas_schema.indicador_financiero if
    LEFT JOIN finanzas_schema.indicador_demografico id ON if.cod_indicador = id.cod_indicador
    GROUP BY if.cod_indicador, id.nombre, id.clasificacion
    ORDER BY total_registros DESC
    """).show(truncate=False)
    
    # 2. DISTRIBUCIÓN POR CLASIFICACIÓN
    print("📊 Distribución por tipo de instrumento:")
    spark.sql("""
    SELECT 
        id.clasificacion,
        COUNT(DISTINCT if.cod_indicador) as num_indicadores,
        COUNT(*) as total_registros,
        AVG(if.cerrar) as precio_promedio
    FROM finanzas_schema.indicador_financiero if
    JOIN finanzas_schema.indicador_demografico id ON if.cod_indicador = id.cod_indicador
    GROUP BY id.clasificacion
    ORDER BY num_indicadores DESC
    """).show()
    
    # 3. TENDENCIAS RECIENTES
    print("📊 Últimos datos por indicador:")
    spark.sql("""
    WITH ultimos_datos AS (
        SELECT 
            cod_indicador,
            fecha,
            cerrar,
            ROW_NUMBER() OVER (PARTITION BY cod_indicador ORDER BY fecha DESC) as rn
        FROM finanzas_schema.indicador_financiero
        WHERE cerrar IS NOT NULL AND cerrar > 0
    )
    SELECT 
        ud.cod_indicador,
        id.nombre,
        ud.fecha as ultima_fecha,
        ud.cerrar as ultimo_precio,
        id.moneda
    FROM ultimos_datos ud
    JOIN finanzas_schema.indicador_demografico id ON ud.cod_indicador = id.cod_indicador
    WHERE ud.rn = 1
    ORDER BY ud.fecha DESC, ud.cod_indicador
    """).show(truncate=False)

def pipeline_indicador_individual(cod_indicador):
    """
    Ejecuta el pipeline para un solo indicador (útil para testing)
    """
    print(f"🎯 PIPELINE INDIVIDUAL PARA: {cod_indicador}")
    print("=" * 40)
    
    try:
        # Verificar que el indicador existe en parámetros
        exists = spark.sql(f"""
        SELECT COUNT(*) as count 
        FROM finanzas_schema.indicador_parametros 
        WHERE cod_indicador = '{cod_indicador}' AND activo = true
        """).collect()[0]['count']
        
        if exists == 0:
            print(f"❌ Indicador {cod_indicador} no encontrado en parámetros activos")
            return None
        
        # Ejecutar pipeline individual
        dataweb = DataWeb([cod_indicador])
        
        # Extraer datos
        df_pandas = dataweb.obtener_datos(cod_indicador)
        if df_pandas.empty:
            print(f"❌ No se obtuvieron datos para {cod_indicador}")
            return None
        
        # Extraer metadatos
        metadatos = dataweb.obtener_metadatos(cod_indicador)
        
        # Mostrar muestra de datos
        print(f"📊 Muestra de datos extraídos:")
        print(df_pandas.head())
        print(f"\n📋 Metadatos extraídos:")
        for key, value in metadatos.items():
            print(f"   {key}: {value}")
        
        return {
            'datos': df_pandas,
            'metadatos': metadatos,
            'registros': len(df_pandas)
        }
        
    except Exception as e:
        print(f"❌ Error en pipeline individual: {str(e)}")
        return None

def ejecutar_pipeline_modo_test():
    """
    Ejecuta el pipeline en modo test con solo algunos indicadores
    """
    print("🧪 EJECUTANDO PIPELINE EN MODO TEST")
    print("=" * 40)
    
    # Solo procesar indicadores de prioridad 1
    spark.sql("""
    UPDATE finanzas_schema.indicador_parametros 
    SET activo = CASE 
        WHEN prioridad = 1 THEN true 
        ELSE false 
    END
    """)
    
    # Ejecutar pipeline completo
    resultado = pipeline_extraccion_completo()
    
    # Restaurar todos los indicadores activos
    spark.sql("""
    UPDATE finanzas_schema.indicador_parametros 
    SET activo = true
    """)
    
    return resultado

