In [3]:
# INSTRUCCIONES DE INSTALACIÓN PARA GOOGLE COLAB:
"""
# Ejecutar en celdas separadas:

# Celda 1: Instalación de librerías
!pip install pandas openpyxl google-cloud-bigquery google-cloud-storage

# Celda 2: Subir archivo de credenciales (opcional para simulación)
from google.colab import files
import os

# Solo ejecutar si tienes credenciales reales
# uploaded = files.upload()
# for filename in uploaded.keys():
#     os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = filename

# Celda 3: Ejecutar este código completo
"""

from datetime import datetime, timedelta
import pandas as pd
import hashlib
from io import StringIO
import json
import os
import numpy as np


In [9]:
# Celda 2: Configuración inicial
import os
from google.colab import files

# Subir el archivo JSON de la service account
print("Sube tu archivo JSON de Service Account:")
uploaded = files.upload()

# Configurar variable de entorno
for filename in uploaded.keys():
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = filename
    print(f"Configurado: {filename}")

Sube tu archivo JSON de Service Account:


Saving sri-vehiculos-etl-6e14a22d7578.json to sri-vehiculos-etl-6e14a22d7578 (1).json
Configurado: sri-vehiculos-etl-6e14a22d7578 (1).json


In [6]:
# ============================================================================
# CONFIGURACIÓN Y CLASES MOCK
# ============================================================================

# Variables de configuración
PROJECT_ID = 'sri-vehiculos-etl'
DATASET_ID = 'sri_vehiculos_dw'
BUCKET_NAME = 'sri-vehiculos-etl-bucket-angel'

print(f"Configuración:")
print(f"- Proyecto: {PROJECT_ID}")
print(f"- Dataset: {DATASET_ID}")
print(f"- Bucket: {BUCKET_NAME}")

Configuración:
- Proyecto: sri-vehiculos-etl
- Dataset: sri_vehiculos_dw
- Bucket: sri-vehiculos-etl-bucket-angel


In [7]:
# Celda 4: Crear dataset en BigQuery
from google.cloud import bigquery

client = bigquery.Client(project=PROJECT_ID)

# Crear dataset si no existe
dataset_ref = client.dataset(DATASET_ID)
try:
    client.get_dataset(dataset_ref)
    print(f"✅ Dataset {DATASET_ID} ya existe")
except:
    dataset = bigquery.Dataset(dataset_ref)
    dataset.location = "US"  # o "EU" según tu preferencia
    dataset = client.create_dataset(dataset)
    print(f"✅ Dataset {DATASET_ID} creado exitosamente")


✅ Dataset sri_vehiculos_dw ya existe


In [17]:
# Celda 5: Crear bucket de Cloud Storage
from google.cloud import storage

storage_client = storage.Client()
BUCKET_NAME = "sri-vehiculos-etl-bucket-angel"

bucket = storage_client.bucket(BUCKET_NAME)

# Subimos un archivo de prueba
blob = bucket.blob("raw-data/.keep")
blob.upload_from_string('')
print("✅ Carpeta raw-data/ creada")

✅ Carpeta raw-data/ creada


In [18]:
# Celda 6: Subir archivo de datos SRI
from google.colab import files
from google.cloud import storage
import io

print("Sube tu archivo CSV de datos del SRI:")
uploaded_files = files.upload()

# Subir al bucket de Cloud Storage
storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)

for filename, content in uploaded_files.items():
    # Subir a la carpeta raw-data
    blob = bucket.blob(f'raw-data/sri_vehiculos.csv')
    blob.upload_from_string(content.decode('utf-8'))
    print(f"✅ Archivo {filename} subido como sri_vehiculos.csv")

    # Mostrar información del archivo
    print(f"📊 Tamaño del archivo: {len(content)} bytes")

Sube tu archivo CSV de datos del SRI:


Saving SRI-vehiculos - SRI_Vehiculos_Nuevos.csv to SRI-vehiculos - SRI_Vehiculos_Nuevos.csv
✅ Archivo SRI-vehiculos - SRI_Vehiculos_Nuevos.csv subido como sri_vehiculos.csv
📊 Tamaño del archivo: 75189919 bytes


In [19]:
# Celda 7: Verificar configuración
from google.cloud import bigquery, storage
import pandas as pd

print("🔍 Verificando configuración...")

# Verificar BigQuery
try:
    bq_client = bigquery.Client(project=PROJECT_ID)
    datasets = list(bq_client.list_datasets())
    print(f"✅ BigQuery conectado. Datasets disponibles: {len(datasets)}")
except Exception as e:
    print(f"❌ Error BigQuery: {e}")

# Verificar Cloud Storage
try:
    storage_client = storage.Client()
    bucket = storage_client.bucket(BUCKET_NAME)
    blobs = list(bucket.list_blobs(prefix='raw-data/'))
    print(f"✅ Cloud Storage conectado. Archivos en raw-data/: {len(blobs)}")
    for blob in blobs:
        print(f"   - {blob.name}")
except Exception as e:
    print(f"❌ Error Cloud Storage: {e}")

# Verificar archivo CSV
try:
    blob = bucket.blob('raw-data/sri_vehiculos.csv')
    content = blob.download_as_text()
    df = pd.read_csv(io.StringIO(content))
    print(f"✅ Archivo CSV leído correctamente")
    print(f"   - Filas: {len(df)}")
    print(f"   - Columnas: {len(df.columns)}")
    print(f"   - Columnas disponibles: {list(df.columns[:10])}")  # Primeras 10
except Exception as e:
    print(f"❌ Error leyendo CSV: {e}")

🔍 Verificando configuración...
✅ BigQuery conectado. Datasets disponibles: 1
✅ Cloud Storage conectado. Archivos en raw-data/: 4
   - raw-data/
   - raw-data/.keep
   - raw-data/SRI-vehiculos.xlsx
   - raw-data/sri_vehiculos.csv
✅ Archivo CSV leído correctamente
   - Filas: 460550
   - Columnas: 20
   - Columnas disponibles: ['CATEGORÍA', 'CÓDIGO DE VEHÍCULO', 'TIPO TRANSACCIÓN', 'MARCA', 'MODELO', 'PAÍS', 'AÑO MODELO', 'CLASE', 'SUB CLASE', 'TIPO']


In [27]:
# Solución sin reinstalar ni reiniciar - Parche directo al módulo
import sys
import pendulum

# Crear el método que Airflow espera encontrar
if not hasattr(pendulum.tz, 'timezone') or not callable(pendulum.tz.timezone):
    # Parche: crear la función que Airflow 2.7.0 busca
    def timezone_function(tz_name):
        return pendulum.timezone(tz_name)

    # Aplicar el parche
    pendulum.tz.timezone = timezone_function
    print("✅ Parche aplicado a pendulum.tz.timezone")

# También parchear el módulo ya cargado en sys.modules si existe
if 'pendulum.tz' in sys.modules:
    sys.modules['pendulum.tz'].timezone = timezone_function
    print("✅ Parche aplicado al módulo cargado")

# Limpiar solo los módulos de airflow (no pendulum)
airflow_modules = [mod for mod in sys.modules if 'airflow' in mod]
for mod in airflow_modules:
    del sys.modules[mod]
print(f"✅ {len(airflow_modules)} módulos de airflow limpiados")

✅ Parche aplicado a pendulum.tz.timezone
✅ Parche aplicado al módulo cargado
✅ 42 módulos de airflow limpiados


In [32]:
# SOLUCIÓN DEFINITIVA - Deshabilitar BD de Airflow completamente
import os
import warnings
warnings.filterwarnings("ignore")

# Variables de entorno para deshabilitar funciones que requieren BD
os.environ['AIRFLOW__CORE__UNIT_TEST_MODE'] = 'True'
os.environ['AIRFLOW__CORE__LOAD_EXAMPLES'] = 'False'
os.environ['AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS'] = 'False'
os.environ['AIRFLOW__CORE__SQL_ALCHEMY_CONN'] = 'sqlite:///:memory:'
os.environ['AIRFLOW__WEBSERVER__EXPOSE_CONFIG'] = 'False'

# Configurar logging para suprimir errores de BD
import logging
logging.getLogger('airflow').setLevel(logging.CRITICAL)
logging.getLogger('sqlalchemy').setLevel(logging.CRITICAL)

# Parche para evitar queries a task_instance
from unittest.mock import patch, MagicMock

# Mock del DAG para evitar queries a BD
original_dag_init = None

def mock_dag_init(self, *args, **kwargs):
    # Llamar constructor original pero sin validaciones de BD
    try:
        if original_dag_init:
            original_dag_init(self, *args, **kwargs)
    except:
        # Si falla, crear DAG mínimo
        self.dag_id = kwargs.get('dag_id', args[0] if args else 'default')
        self.default_args = kwargs.get('default_args', {})
        self.tasks = []
        self.task_dict = {}
        self._task_group = None

# Importar y patchear
from airflow import DAG as OriginalDAG
original_dag_init = OriginalDAG.__init__

# Aplicar patch
OriginalDAG.__init__ = mock_dag_init

print("✅ Airflow configurado para modo desarrollo sin BD")

✅ Airflow configurado para modo desarrollo sin BD


In [34]:
# SOLUCIÓN DEFINITIVA: Airflow Mock para Google Colab
# Sin dependencias de base de datos, completamente funcional para desarrollo

import warnings
warnings.filterwarnings("ignore")

from datetime import datetime, timedelta
import pandas as pd
from google.cloud import storage, bigquery
import hashlib
from io import StringIO
import logging

# Configurar logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class MockDAG:
    """Simulador de DAG de Airflow sin dependencias de BD"""

    def __init__(self, dag_id, default_args=None, description=None,
                 schedule_interval=None, start_date=None, catchup=False,
                 tags=None, **kwargs):
        self.dag_id = dag_id
        self.default_args = default_args or {}
        self.description = description
        self.schedule_interval = schedule_interval
        self.start_date = start_date
        self.catchup = catchup
        self.tags = tags or []
        self.tasks = []
        self.task_dict = {}
        self._current_dag = None

        logger.info(f"🔧 DAG '{dag_id}' creado exitosamente")

    def __enter__(self):
        MockDAG._current_dag = self
        return self

    def __exit__(self, type, value, traceback):
        MockDAG._current_dag = None
        logger.info(f"✅ DAG '{self.dag_id}' configurado con {len(self.tasks)} tareas")

    def add_task(self, task):
        """Agregar tarea al DAG"""
        self.tasks.append(task)
        self.task_dict[task.task_id] = task
        task.dag = self

    def run_all_tasks(self):
        """Ejecutar todas las tareas del DAG secuencialmente"""
        logger.info(f"🚀 Iniciando ejecución del DAG: {self.dag_id}")
        results = {}

        for task in self.tasks:
            try:
                logger.info(f"▶️  Ejecutando tarea: {task.task_id}")
                result = task.execute()
                results[task.task_id] = result
                logger.info(f"✅ Tarea {task.task_id} completada")
                if result:
                    print(f"   📊 Resultado: {result}")
            except Exception as e:
                logger.error(f"❌ Error en tarea {task.task_id}: {str(e)}")
                results[task.task_id] = f"ERROR: {str(e)}"

        logger.info(f"🎉 DAG {self.dag_id} completado")
        return results


class MockPythonOperator:
    """Simulador de PythonOperator sin dependencias de BD"""

    def __init__(self, task_id, python_callable=None, op_args=None,
                 op_kwargs=None, dag=None, **kwargs):
        self.task_id = task_id
        self.python_callable = python_callable
        self.op_args = op_args or []
        self.op_kwargs = op_kwargs or {}
        self.dag = dag
        self.kwargs = kwargs

        # Auto-agregar al DAG actual si existe
        if dag:
            dag.add_task(self)
        elif MockDAG._current_dag:
            MockDAG._current_dag.add_task(self)

        logger.info(f"🔧 PythonOperator '{task_id}' registrado")

    def execute(self):
        """Ejecutar la función Python"""
        if self.python_callable:
            try:
                # Crear contexto mock
                context = {
                    'dag': self.dag,
                    'task': self,
                    'task_instance': self,
                    'execution_date': datetime.now(),
                    'ds': datetime.now().strftime('%Y-%m-%d'),
                    'ts': datetime.now().isoformat(),
                }

                # Ejecutar función con argumentos
                if self.op_kwargs:
                    return self.python_callable(*self.op_args, **self.op_kwargs, **context)
                else:
                    return self.python_callable(*self.op_args, **context)
            except Exception as e:
                logger.error(f"Error ejecutando {self.task_id}: {e}")
                raise
        else:
            logger.info(f"Tarea {self.task_id} sin función asociada")
            return None


class MockEmptyOperator:
    """Simulador de EmptyOperator (antes DummyOperator)"""

    def __init__(self, task_id, dag=None, **kwargs):
        self.task_id = task_id
        self.dag = dag
        self.kwargs = kwargs

        # Auto-agregar al DAG actual si existe
        if dag:
            dag.add_task(self)
        elif MockDAG._current_dag:
            MockDAG._current_dag.add_task(self)

        logger.info(f"⭕ EmptyOperator '{task_id}' registrado")

    def execute(self):
        """Simular ejecución vacía"""
        logger.info(f"⏭️  Ejecutando tarea vacía: {self.task_id}")
        return f"Tarea {self.task_id} ejecutada exitosamente"


# Funciones de utilidad para testing
def ejecutar_tarea_individual(dag, task_id):
    """Ejecutar una tarea específica del DAG"""
    if task_id in dag.task_dict:
        task = dag.task_dict[task_id]
        logger.info(f"🎯 Ejecutando tarea individual: {task_id}")
        return task.execute()
    else:
        logger.error(f"❌ Tarea '{task_id}' no encontrada en el DAG")
        return None

def listar_tareas_dag(dag):
    """Mostrar todas las tareas de un DAG"""
    print(f"\n📋 Tareas en DAG '{dag.dag_id}':")
    for i, task in enumerate(dag.tasks, 1):
        task_type = "Python" if hasattr(task, 'python_callable') else "Empty"
        print(f"  {i}. {task.task_id} ({task_type})")
    print()

def validar_estructura_dag(dag):
    """Validar que el DAG esté bien estructurado"""
    print(f"\n🔍 Validando DAG: {dag.dag_id}")
    print(f"  ✓ Número de tareas: {len(dag.tasks)}")
    print(f"  ✓ Schedule: {dag.schedule_interval}")
    print(f"  ✓ Start date: {dag.start_date}")
    print(f"  ✓ Tags: {dag.tags}")

    # Validar tareas Python
    python_tasks = [t for t in dag.tasks if hasattr(t, 'python_callable')]
    empty_tasks = [t for t in dag.tasks if not hasattr(t, 'python_callable')]

    print(f"  ✓ Tareas Python: {len(python_tasks)}")
    print(f"  ✓ Tareas Empty: {len(empty_tasks)}")

    # Verificar funciones Python
    missing_functions = [t.task_id for t in python_tasks if t.python_callable is None]
    if missing_functions:
        print(f"  ⚠️  Tareas sin función: {missing_functions}")
    else:
        print("  ✅ Todas las tareas Python tienen funciones asignadas")

    print("✅ Validación completada\n")

# Crear aliases para compatibilidad con código Airflow existente
DAG = MockDAG
PythonOperator = MockPythonOperator
EmptyOperator = MockEmptyOperator
DummyOperator = MockEmptyOperator  # Alias para compatibilidad

# Variable global para DAG actual
MockDAG._current_dag = None

print("🎉 ¡Airflow Mock cargado exitosamente!")
print("✅ Puedes usar DAG, PythonOperator y EmptyOperator normalmente")
print("🧪 Funciones adicionales:")
print("   - dag.run_all_tasks() - Ejecutar todo el DAG")
print("   - ejecutar_tarea_individual(dag, 'task_id') - Ejecutar una tarea")
print("   - listar_tareas_dag(dag) - Ver todas las tareas")
print("   - validar_estructura_dag(dag) - Validar el DAG")

🎉 ¡Airflow Mock cargado exitosamente!
✅ Puedes usar DAG, PythonOperator y EmptyOperator normalmente
🧪 Funciones adicionales:
   - dag.run_all_tasks() - Ejecutar todo el DAG
   - ejecutar_tarea_individual(dag, 'task_id') - Ejecutar una tarea
   - listar_tareas_dag(dag) - Ver todas las tareas
   - validar_estructura_dag(dag) - Validar el DAG


In [35]:
# SRI_Vehiculos_ETL_DAG.py
# Implementación completa del proceso ETL para datos vehiculares del SRI

import warnings
warnings.filterwarnings("ignore")

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
import pandas as pd
from google.cloud import storage, bigquery
import hashlib
from io import StringIO
import logging
DummyOperator = EmptyOperator


# Configuración por defecto del DAG
default_args = {
    'owner': 'sri_data_engineer',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

# Definición del DAG
dag = DAG(
    'sri_vehiculos_etl_proceso',
    default_args=default_args,
    description='Proceso ETL completo para datos vehiculares del SRI',
    schedule_interval='@daily',
    catchup=False,
    tags=['sri', 'vehiculos', 'etl', 'bigquery'],
    max_active_runs=1
)

# Variables de configuración
PROJECT_ID = 'sri-vehiculos-etl'  # Reemplazar con tu project ID
DATASET_ID = 'sri_vehiculos_dw'
BUCKET_NAME = 'sri-vehiculos-etl-bucket-angel'  # Reemplazar con tu bucket

# ===============================
# FUNCIONES ETL PARA DIMENSIONES
# ===============================

def etl_dim_tiempo(**context):
    """
    Proceso ETL para la dimensión Tiempo
    Genera un rango completo de fechas desde 2020 hasta 2025
    """
    try:
        logging.info("🕐 Iniciando ETL para Dim_Tiempo...")

        # Configurar cliente de BigQuery
        client = bigquery.Client(project=PROJECT_ID)

        # Generar rango de fechas
        start_date = datetime(2020, 1, 1)
        end_date = datetime(2025, 12, 31)
        fechas = pd.date_range(start=start_date, end=end_date, freq='D')

        logging.info(f"📅 Generando {len(fechas)} registros de fechas...")

        # Crear DataFrame de dimensión tiempo
        dim_tiempo = pd.DataFrame({
            'ID_Tiempo': range(1, len(fechas) + 1),
            'FechaCompleta': fechas.date,
            'Anio': fechas.year,
            'Trimestre': fechas.quarter,
            'Mes': fechas.month,
            'Dia': fechas.day,
            'NombreMes': fechas.strftime('%B'),
            'NombreDiaSemana': fechas.strftime('%A')
        })

        # Traducir nombres al español
        meses_es = {
            'January': 'Enero', 'February': 'Febrero', 'March': 'Marzo',
            'April': 'Abril', 'May': 'Mayo', 'June': 'Junio',
            'July': 'Julio', 'August': 'Agosto', 'September': 'Septiembre',
            'October': 'Octubre', 'November': 'Noviembre', 'December': 'Diciembre'
        }

        dias_es = {
            'Monday': 'Lunes', 'Tuesday': 'Martes', 'Wednesday': 'Miércoles',
            'Thursday': 'Jueves', 'Friday': 'Viernes', 'Saturday': 'Sábado',
            'Sunday': 'Domingo'
        }

        dim_tiempo['NombreMes'] = dim_tiempo['NombreMes'].map(meses_es)
        dim_tiempo['NombreDiaSemana'] = dim_tiempo['NombreDiaSemana'].map(dias_es)

        # Cargar a BigQuery
        table_id = f'{PROJECT_ID}.{DATASET_ID}.dim_tiempo'
        job_config = bigquery.LoadJobConfig(
            write_disposition="WRITE_TRUNCATE",
            schema=[
                bigquery.SchemaField("ID_Tiempo", "INTEGER"),
                bigquery.SchemaField("FechaCompleta", "DATE"),
                bigquery.SchemaField("Anio", "INTEGER"),
                bigquery.SchemaField("Trimestre", "INTEGER"),
                bigquery.SchemaField("Mes", "INTEGER"),
                bigquery.SchemaField("Dia", "INTEGER"),
                bigquery.SchemaField("NombreMes", "STRING"),
                bigquery.SchemaField("NombreDiaSemana", "STRING"),
            ]
        )

        job = client.load_table_from_dataframe(dim_tiempo, table_id, job_config=job_config)
        job.result()  # Esperar a que termine

        logging.info(f"✅ Cargados {len(dim_tiempo)} registros en dim_tiempo")
        return f"Dim_Tiempo cargada exitosamente: {len(dim_tiempo)} registros"

    except Exception as e:
        logging.error(f"❌ Error en ETL Dim_Tiempo: {str(e)}")
        raise

def etl_dim_vehiculo(**context):
    """
    Proceso ETL para la dimensión Vehículo
    Extrae características únicas de vehículos del archivo CSV
    """
    try:
        logging.info("🚗 Iniciando ETL para Dim_Vehiculo...")

        # Configurar clientes
        storage_client = storage.Client()
        bigquery_client = bigquery.Client(project=PROJECT_ID)

        # Extraer datos del bucket
        bucket = storage_client.bucket(BUCKET_NAME)
        blob = bucket.blob('raw-data/sri_vehiculos.csv')

        # Leer CSV desde Cloud Storage
        content = blob.download_as_text()
        df = pd.read_csv(StringIO(content))

        logging.info(f"📊 Datos extraídos: {len(df)} registros originales")

        # Seleccionar columnas para la dimensión vehículo
        columnas_vehiculo = [
            'CÓDIGO DE VEHÍCULO', 'MARCA', 'MODELO', 'PAÍS',
            'AÑO MODELO', 'CLASE', 'SUB CLASE', 'TIPO',
            'CILINDRAJE', 'TIPO COMBUSTIBLE', 'COLOR 1', 'COLOR 2'
        ]

        # Verificar que las columnas existen
        columnas_existentes = [col for col in columnas_vehiculo if col in df.columns]
        if len(columnas_existentes) != len(columnas_vehiculo):
            logging.warning(f"Algunas columnas no encontradas. Usando: {columnas_existentes}")

        # Crear dimensión con registros únicos
        dim_vehiculo = df[columnas_existentes].drop_duplicates().reset_index(drop=True)

        # Generar clave subrogada
        dim_vehiculo['ID_Vehiculo'] = range(1, len(dim_vehiculo) + 1)

        # Limpiar y estandarizar datos
        for col in ['MARCA', 'MODELO', 'PAÍS', 'CLASE', 'SUB CLASE', 'TIPO', 'TIPO COMBUSTIBLE']:
            if col in dim_vehiculo.columns:
                dim_vehiculo[col] = dim_vehiculo[col].astype(str).str.upper().str.strip()

        # Manejar valores nulos
        if 'COLOR 2' in dim_vehiculo.columns:
            dim_vehiculo['COLOR 2'] = dim_vehiculo['COLOR 2'].fillna('N/A')

        # Renombrar columnas para BigQuery (sin espacios ni caracteres especiales)
        rename_dict = {
            'CÓDIGO DE VEHÍCULO': 'CodigoVehiculo',
            'MARCA': 'Marca',
            'MODELO': 'Modelo',
            'PAÍS': 'Pais',
            'AÑO MODELO': 'AnioModelo',
            'CLASE': 'Clase',
            'SUB CLASE': 'SubClase',
            'TIPO': 'Tipo',
            'CILINDRAJE': 'Cilindraje',
            'TIPO COMBUSTIBLE': 'TipoCombustible',
            'COLOR 1': 'Color1',
            'COLOR 2': 'Color2'
        }

        # Solo renombrar columnas que existen
        rename_dict_filtered = {k: v for k, v in rename_dict.items() if k in dim_vehiculo.columns}
        dim_vehiculo = dim_vehiculo.rename(columns=rename_dict_filtered)

        # Reordenar columnas (solo las que existen)
        columnas_orden = ['ID_Vehiculo'] + [v for k, v in rename_dict_filtered.items()]
        dim_vehiculo = dim_vehiculo[columnas_orden]

        logging.info(f"🔧 Transformación completada: {len(dim_vehiculo)} vehículos únicos")

        # Cargar a BigQuery
        table_id = f'{PROJECT_ID}.{DATASET_ID}.dim_vehiculo'
        job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")

        job = bigquery_client.load_table_from_dataframe(dim_vehiculo, table_id, job_config=job_config)
        job.result()

        logging.info(f"✅ Cargados {len(dim_vehiculo)} registros en dim_vehiculo")
        return f"Dim_Vehiculo cargada exitosamente: {len(dim_vehiculo)} registros"

    except Exception as e:
        logging.error(f"❌ Error en ETL Dim_Vehiculo: {str(e)}")
        raise

def etl_dim_transaccion(**context):
    """
    Proceso ETL para la dimensión Transacción
    Crea combinaciones únicas de tipos de transacción y servicio
    """
    try:
        logging.info("💼 Iniciando ETL para Dim_Transaccion...")

        # Configurar clientes
        storage_client = storage.Client()
        bigquery_client = bigquery.Client(project=PROJECT_ID)

        # Extraer datos del bucket
        bucket = storage_client.bucket(BUCKET_NAME)
        blob = bucket.blob('raw-data/sri_vehiculos.csv')

        content = blob.download_as_text()
        df = pd.read_csv(StringIO(content))

        # Seleccionar columnas para dimensión transacción
        columnas_transaccion = [
            'TIPO TRANSACCIÓN', 'TIPO SERVICIO',
            'PERSONA NATURAL - JURÍDICA', 'CATEGORÍA'
        ]

        # Verificar columnas existentes
        columnas_existentes = [col for col in columnas_transaccion if col in df.columns]
        logging.info(f"Columnas encontradas: {columnas_existentes}")

        # Crear dimensión con combinaciones únicas
        dim_transaccion = df[columnas_existentes].drop_duplicates().reset_index(drop=True)

        # Generar clave subrogada
        dim_transaccion['ID_Transaccion'] = range(1, len(dim_transaccion) + 1)

        # Limpiar datos
        for col in columnas_existentes:
            if col in dim_transaccion.columns:
                dim_transaccion[col] = dim_transaccion[col].astype(str).str.upper().str.strip()

        # Renombrar columnas
        rename_dict = {
            'TIPO TRANSACCIÓN': 'TipoTransaccion',
            'TIPO SERVICIO': 'TipoServicio',
            'PERSONA NATURAL - JURÍDICA': 'PersonaTipo',
            'CATEGORÍA': 'Categoria'
        }

        rename_dict_filtered = {k: v for k, v in rename_dict.items() if k in dim_transaccion.columns}
        dim_transaccion = dim_transaccion.rename(columns=rename_dict_filtered)

        # Reordenar columnas
        columnas_orden = ['ID_Transaccion'] + [v for k, v in rename_dict_filtered.items()]
        dim_transaccion = dim_transaccion[columnas_orden]

        logging.info(f"🔧 Transformación completada: {len(dim_transaccion)} tipos de transacción únicos")

        # Cargar a BigQuery
        table_id = f'{PROJECT_ID}.{DATASET_ID}.dim_transaccion'
        job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")

        job = bigquery_client.load_table_from_dataframe(dim_transaccion, table_id, job_config=job_config)
        job.result()

        logging.info(f"✅ Cargados {len(dim_transaccion)} registros en dim_transaccion")
        return f"Dim_Transaccion cargada exitosamente: {len(dim_transaccion)} registros"

    except Exception as e:
        logging.error(f"❌ Error en ETL Dim_Transaccion: {str(e)}")
        raise

def etl_dim_ubicacion(**context):
    """
    Proceso ETL para la dimensión Ubicación
    Mapea códigos de cantón a información geográfica completa
    """
    try:
        logging.info("🌎 Iniciando ETL para Dim_Ubicacion...")

        # Configurar clientes
        storage_client = storage.Client()
        bigquery_client = bigquery.Client(project=PROJECT_ID)

        # Extraer datos del bucket
        bucket = storage_client.bucket(BUCKET_NAME)
        blob = bucket.blob('raw-data/sri_vehiculos.csv')

        content = blob.download_as_text()
        df = pd.read_csv(StringIO(content))

        # Mapeo de cantones expandido
        mapeo_cantones = {
            '10701': {'canton': 'CUENCA', 'provincia': 'AZUAY', 'region': 'SIERRA'},
            '10911': {'canton': 'GIRON', 'provincia': 'AZUAY', 'region': 'SIERRA'},
            '10901': {'canton': 'GUALACEO', 'provincia': 'AZUAY', 'region': 'SIERRA'},
            '10927': {'canton': 'SANTA ISABEL', 'provincia': 'AZUAY', 'region': 'SIERRA'},
            '20606': {'canton': 'PLAYAS', 'provincia': 'GUAYAS', 'region': 'COSTA'},
            '21101': {'canton': 'GUAYAQUIL', 'provincia': 'GUAYAS', 'region': 'COSTA'},
            '21709': {'canton': 'MILAGRO', 'provincia': 'GUAYAS', 'region': 'COSTA'},
            '31905': {'canton': 'ZAMORA', 'provincia': 'ZAMORA CHINCHIPE', 'region': 'AMAZONIA'},
            '20501': {'canton': 'QUITO', 'provincia': 'PICHINCHA', 'region': 'SIERRA'},
            '20505': {'canton': 'CAYAMBE', 'provincia': 'PICHINCHA', 'region': 'SIERRA'},
            '30101': {'canton': 'LAGO AGRIO', 'provincia': 'SUCUMBIOS', 'region': 'AMAZONIA'},
            '30201': {'canton': 'GONZALO PIZARRO', 'provincia': 'SUCUMBIOS', 'region': 'AMAZONIA'},
            '30301': {'canton': 'PUTUMAYO', 'provincia': 'SUCUMBIOS', 'region': 'AMAZONIA'},
            '30401': {'canton': 'SHUSHUFINDI', 'provincia': 'SUCUMBIOS', 'region': 'AMAZONIA'},
            '30501': {'canton': 'SUCUMBIOS', 'provincia': 'SUCUMBIOS', 'region': 'AMAZONIA'},
            '30601': {'canton': 'CASCALES', 'provincia': 'SUCUMBIOS', 'region': 'AMAZONIA'},
            '30701': {'canton': 'CUYABENO', 'provincia': 'SUCUMBIOS', 'region': 'AMAZONIA'},
        }

        # Verificar si la columna CANTON existe
        col_canton = None
        for col in ['CANTON', 'CANTÓN', 'canton', 'cantón']:
            if col in df.columns:
                col_canton = col
                break

        if col_canton is None:
            logging.warning("No se encontró columna de cantón. Usando ubicación genérica.")
            # Crear una ubicación por defecto
            dim_ubicacion = pd.DataFrame([{
                'ID_Ubicacion': 1,
                'CodigoCanton': '99999',
                'NombreCanton': 'NO_ESPECIFICADO',
                'Provincia': 'NO_ESPECIFICADA',
                'Region': 'NO_ESPECIFICADA',
                'Pais': 'ECUADOR'
            }])
        else:
            # Obtener cantones únicos del dataset
            cantones_dataset = df[col_canton].dropna().unique()

            # Crear dimensión ubicación
            ubicaciones = []
            id_counter = 1

            for codigo_canton in cantones_dataset:
                codigo_str = str(codigo_canton).strip()
                if codigo_str in mapeo_cantones:
                    info = mapeo_cantones[codigo_str]
                    ubicaciones.append({
                        'ID_Ubicacion': id_counter,
                        'CodigoCanton': codigo_str,
                        'NombreCanton': info['canton'],
                        'Provincia': info['provincia'],
                        'Region': info['region'],
                        'Pais': 'ECUADOR'
                    })
                else:
                    # Para cantones no mapeados, crear entrada genérica
                    ubicaciones.append({
                        'ID_Ubicacion': id_counter,
                        'CodigoCanton': codigo_str,
                        'NombreCanton': f'CANTON_{codigo_str}',
                        'Provincia': 'NO_IDENTIFICADA',
                        'Region': 'NO_IDENTIFICADA',
                        'Pais': 'ECUADOR'
                    })
                id_counter += 1

            dim_ubicacion = pd.DataFrame(ubicaciones)

        logging.info(f"🔧 Transformación completada: {len(dim_ubicacion)} ubicaciones únicas")

        # Cargar a BigQuery
        table_id = f'{PROJECT_ID}.{DATASET_ID}.dim_ubicacion'
        job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")

        job = bigquery_client.load_table_from_dataframe(dim_ubicacion, table_id, job_config=job_config)
        job.result()

        logging.info(f"✅ Cargados {len(dim_ubicacion)} registros en dim_ubicacion")
        return f"Dim_Ubicacion cargada exitosamente: {len(dim_ubicacion)} registros"

    except Exception as e:
        logging.error(f"❌ Error en ETL Dim_Ubicacion: {str(e)}")
        raise

# ===============================
# FUNCIÓN ETL PARA TABLA DE HECHOS
# ===============================

def etl_fact_registro_vehiculos(**context):
    """
    Proceso ETL para la tabla de hechos Fact_RegistroVehiculos
    Realiza lookups con las dimensiones y carga métricas
    """
    try:
        logging.info("📊 Iniciando ETL para Fact_RegistroVehiculos...")

        # Configurar clientes
        storage_client = storage.Client()
        bigquery_client = bigquery.Client(project=PROJECT_ID)

        # Extraer datos principales del bucket
        bucket = storage_client.bucket(BUCKET_NAME)
        blob = bucket.blob('raw-data/sri_vehiculos.csv')

        content = blob.download_as_text()
        df_hechos = pd.read_csv(StringIO(content))

        logging.info(f"📊 Datos extraídos: {len(df_hechos)} registros de hechos")

        # Cargar dimensiones desde BigQuery para lookups
        logging.info("🔍 Cargando dimensiones para lookups...")

        try:
            # Cargar Dim_Tiempo
            query_tiempo = f"SELECT * FROM `{PROJECT_ID}.{DATASET_ID}.dim_tiempo`"
            dim_tiempo = bigquery_client.query(query_tiempo).to_dataframe()

            # Cargar Dim_Vehiculo
            query_vehiculo = f"SELECT * FROM `{PROJECT_ID}.{DATASET_ID}.dim_vehiculo`"
            dim_vehiculo = bigquery_client.query(query_vehiculo).to_dataframe()

            # Cargar Dim_Transaccion
            query_transaccion = f"SELECT * FROM `{PROJECT_ID}.{DATASET_ID}.dim_transaccion`"
            dim_transaccion = bigquery_client.query(query_transaccion).to_dataframe()

            # Cargar Dim_Ubicacion
            query_ubicacion = f"SELECT * FROM `{PROJECT_ID}.{DATASET_ID}.dim_ubicacion`"
            dim_ubicacion = bigquery_client.query(query_ubicacion).to_dataframe()

            logging.info("✅ Dimensiones cargadas para lookups")

        except Exception as e:
            logging.error(f"Error cargando dimensiones: {str(e)}")
            raise

        # Procesamiento de fechas
        logging.info("📅 Procesando fechas...")

        # Buscar columna de fecha
        col_fecha = None
        for col in ['FECHA PROCESO', 'FECHA_PROCESO', 'fecha_proceso', 'FECHA']:
            if col in df_hechos.columns:
                col_fecha = col
                break

        if col_fecha:
            try:
                df_hechos['FECHA_PROCESO_CONV'] = pd.to_datetime(df_hechos[col_fecha], errors='coerce')
                # Filtrar fechas válidas
                df_hechos = df_hechos.dropna(subset=['FECHA_PROCESO_CONV'])
                df_hechos['FECHA_PROCESO_DATE'] = df_hechos['FECHA_PROCESO_CONV'].dt.date
            except Exception as e:
                logging.warning(f"Error procesando fechas: {str(e)}. Usando fecha por defecto.")
                df_hechos['FECHA_PROCESO_DATE'] = datetime.now().date()
        else:
            logging.warning("No se encontró columna de fecha. Usando fecha actual.")
            df_hechos['FECHA_PROCESO_DATE'] = datetime.now().date()

        # Realizar lookups con dimensiones
        logging.info("🔗 Realizando lookups con dimensiones...")

        # Lookup con Dim_Tiempo
        df_hechos = df_hechos.merge(
            dim_tiempo[['ID_Tiempo', 'FechaCompleta']],
            left_on='FECHA_PROCESO_DATE',
            right_on='FechaCompleta',
            how='left'
        )

        # Lookup con Dim_Vehiculo (usando código de vehículo)
        col_codigo_vehiculo = None
        for col in ['CÓDIGO DE VEHÍCULO', 'CODIGO_VEHICULO', 'codigo_vehiculo']:
            if col in df_hechos.columns:
                col_codigo_vehiculo = col
                break

        if col_codigo_vehiculo:
            df_hechos = df_hechos.merge(
                dim_vehiculo[['ID_Vehiculo', 'CodigoVehiculo']],
                left_on=col_codigo_vehiculo,
                right_on='CodigoVehiculo',
                how='left'
            )
        else:
            df_hechos['ID_Vehiculo'] = 1  # ID por defecto

        # Lookup con Dim_Transaccion
        merge_cols = []
        if 'TIPO TRANSACCIÓN' in df_hechos.columns and 'TipoTransaccion' in dim_transaccion.columns:
            merge_cols.append(('TIPO TRANSACCIÓN', 'TipoTransaccion'))
        if 'TIPO SERVICIO' in df_hechos.columns and 'TipoServicio' in dim_transaccion.columns:
            merge_cols.append(('TIPO SERVICIO', 'TipoServicio'))

        if merge_cols:
            left_cols = [col[0] for col in merge_cols]
            right_cols = [col[1] for col in merge_cols]
            df_hechos = df_hechos.merge(
                dim_transaccion[['ID_Transaccion'] + right_cols],
                left_on=left_cols,
                right_on=right_cols,
                how='left'
            )
        else:
            df_hechos['ID_Transaccion'] = 1  # ID por defecto

        # Lookup con Dim_Ubicacion
        col_canton = None
        for col in ['CANTON', 'CANTÓN', 'canton']:
            if col in df_hechos.columns:
                col_canton = col
                break

        if col_canton:
            df_hechos[col_canton] = df_hechos[col_canton].astype(str)
            df_hechos = df_hechos.merge(
                dim_ubicacion[['ID_Ubicacion', 'CodigoCanton']],
                left_on=col_canton,
                right_on='CodigoCanton',
                how='left'
            )
        else:
            df_hechos['ID_Ubicacion'] = 1  # ID por defecto

        # Crear tabla de hechos final
        logging.info("📋 Creando tabla de hechos final...")

        # Generar ID único para cada registro
        df_hechos['ID_Registro'] = range(1, len(df_hechos) + 1)

        # Calcular métricas
        df_hechos['CantidadRegistros'] = 1

        # Buscar columna de avalúo
        col_avaluo = None
        for col in ['AVALUO', 'AVALÚO', 'avaluo', 'avalúo']:
            if col in df_hechos.columns:
                col_avaluo = col
                break

        if col_avaluo:
            df_hechos['MontoAvaluo'] = pd.to_numeric(df_hechos[col_avaluo], errors='coerce').fillna(0)
        else:
            df_hechos['MontoAvaluo'] = 0

        # Seleccionar columnas finales para la tabla de hechos
        columnas_fact = [
            'ID_Registro',
            'ID_Tiempo',
            'ID_Vehiculo',
            'ID_Transaccion',
            'ID_Ubicacion',
            'CantidadRegistros',
            'MontoAvaluo'
        ]

        # Verificar que todas las columnas existen
        columnas_existentes = [col for col in columnas_fact if col in df_hechos.columns]
        fact_table = df_hechos[columnas_existentes].copy()

        # Llenar valores nulos con defaults
        for col in ['ID_Tiempo', 'ID_Vehiculo', 'ID_Transaccion', 'ID_Ubicacion']:
            if col in fact_table.columns:
                fact_table[col] = fact_table[col].fillna(1)

        fact_table = fact_table.fillna(0)

        logging.info(f"🔧 Tabla de hechos creada: {len(fact_table)} registros")

        # Cargar a BigQuery
        table_id = f'{PROJECT_ID}.{DATASET_ID}.fact_registro_vehiculos'
        job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")

        job = bigquery_client.load_table_from_dataframe(fact_table, table_id, job_config=job_config)
        job.result()

        logging.info(f"✅ Cargados {len(fact_table)} registros en fact_registro_vehiculos")
        return f"Fact_RegistroVehiculos cargada exitosamente: {len(fact_table)} registros"

    except Exception as e:
        logging.error(f"❌ Error en ETL Fact_RegistroVehiculos: {str(e)}")
        raise

# ===============================
# DEFINICIÓN DE TAREAS DEL DAG
# ===============================

# Tarea de inicio
inicio = DummyOperator(
    task_id='inicio_proceso_etl',
    dag=dag
)

# Tareas ETL para dimensiones
tarea_dim_tiempo = PythonOperator(
    task_id='etl_dim_tiempo',
    python_callable=etl_dim_tiempo,
    dag=dag
)

tarea_dim_vehiculo = PythonOperator(
    task_id='etl_dim_vehiculo',
    python_callable=etl_dim_vehiculo,
    dag=dag
)

tarea_dim_transaccion = PythonOperator(
    task_id='etl_dim_transaccion',
    python_callable=etl_dim_transaccion,
    dag=dag
)

tarea_dim_ubicacion = PythonOperator(
    task_id='etl_dim_ubicacion',
    python_callable=etl_dim_ubicacion,
    dag=dag
)

# Tarea de sincronización para dimensiones
sincronizacion_dimensiones = DummyOperator(
    task_id='sincronizacion_dimensiones',
    dag=dag
)

# Tarea ETL para tabla de hechos
tarea_fact_registro = PythonOperator(
    task_id='etl_fact_registro_vehiculos',
    python_callable=etl_fact_registro_vehiculos,
    dag=dag
)

# Tarea de finalización
finalizacion = DummyOperator(
    task_id='finalizacion_proceso_etl',
    dag=dag
)

# ===============================
# FUNCIONES DE VALIDACIÓN Y MONITOREO
# ===============================

def validar_calidad_datos(**context):
    """
    Función para validar la calidad de los datos cargados
    """
    try:
        logging.info("🔍 Iniciando validación de calidad de datos...")

        client = bigquery.Client(project=PROJECT_ID)

        # Validaciones para dimensiones
        validaciones = []

        # Validar Dim_Tiempo
        query_tiempo = f"""
        SELECT
            COUNT(*) as total_registros,
            COUNT(DISTINCT Anio) as anios_unicos,
            MIN(FechaCompleta) as fecha_min,
            MAX(FechaCompleta) as fecha_max
        FROM `{PROJECT_ID}.{DATASET_ID}.dim_tiempo`
        """

        result_tiempo = client.query(query_tiempo).to_dataframe()
        validaciones.append(f"Dim_Tiempo: {result_tiempo.iloc[0]['total_registros']} registros, "
                          f"años {result_tiempo.iloc[0]['anios_unicos']}, "
                          f"rango: {result_tiempo.iloc[0]['fecha_min']} a {result_tiempo.iloc[0]['fecha_max']}")

        # Validar Dim_Vehiculo
        query_vehiculo = f"""
        SELECT
            COUNT(*) as total_registros,
            COUNT(DISTINCT Marca) as marcas_unicas,
            COUNT(DISTINCT Clase) as clases_unicas
        FROM `{PROJECT_ID}.{DATASET_ID}.dim_vehiculo`
        """

        result_vehiculo = client.query(query_vehiculo).to_dataframe()
        validaciones.append(f"Dim_Vehiculo: {result_vehiculo.iloc[0]['total_registros']} registros, "
                          f"{result_vehiculo.iloc[0]['marcas_unicas']} marcas, "
                          f"{result_vehiculo.iloc[0]['clases_unicas']} clases")

        # Validar Dim_Transaccion
        query_transaccion = f"""
        SELECT
            COUNT(*) as total_registros,
            COUNT(DISTINCT TipoTransaccion) as tipos_transaccion
        FROM `{PROJECT_ID}.{DATASET_ID}.dim_transaccion`
        """

        result_transaccion = client.query(query_transaccion).to_dataframe()
        validaciones.append(f"Dim_Transaccion: {result_transaccion.iloc[0]['total_registros']} registros, "
                          f"{result_transaccion.iloc[0]['tipos_transaccion']} tipos de transacción")

        # Validar Dim_Ubicacion
        query_ubicacion = f"""
        SELECT
            COUNT(*) as total_registros,
            COUNT(DISTINCT Provincia) as provincias_unicas,
            COUNT(DISTINCT Region) as regiones_unicas
        FROM `{PROJECT_ID}.{DATASET_ID}.dim_ubicacion`
        """

        result_ubicacion = client.query(query_ubicacion).to_dataframe()
        validaciones.append(f"Dim_Ubicacion: {result_ubicacion.iloc[0]['total_registros']} registros, "
                          f"{result_ubicacion.iloc[0]['provincias_unicas']} provincias, "
                          f"{result_ubicacion.iloc[0]['regiones_unicas']} regiones")

        # Validar Fact_RegistroVehiculos
        query_fact = f"""
        SELECT
            COUNT(*) as total_registros,
            SUM(CantidadRegistros) as total_cantidad,
            AVG(MontoAvaluo) as avaluo_promedio,
            COUNT(CASE WHEN ID_Tiempo IS NULL THEN 1 END) as registros_sin_tiempo,
            COUNT(CASE WHEN ID_Vehiculo IS NULL THEN 1 END) as registros_sin_vehiculo
        FROM `{PROJECT_ID}.{DATASET_ID}.fact_registro_vehiculos`
        """

        result_fact = client.query(query_fact).to_dataframe()
        validaciones.append(f"Fact_RegistroVehiculos: {result_fact.iloc[0]['total_registros']} registros, "
                          f"cantidad total: {result_fact.iloc[0]['total_cantidad']}, "
                          f"avalúo promedio: ${result_fact.iloc[0]['avaluo_promedio']:,.2f}")

        # Log de todas las validaciones
        for validacion in validaciones:
            logging.info(f"✅ {validacion}")

        # Verificar integridad referencial
        query_integridad = f"""
        SELECT
            COUNT(*) as registros_con_claves_validas
        FROM `{PROJECT_ID}.{DATASET_ID}.fact_registro_vehiculos` f
        INNER JOIN `{PROJECT_ID}.{DATASET_ID}.dim_tiempo` t ON f.ID_Tiempo = t.ID_Tiempo
        INNER JOIN `{PROJECT_ID}.{DATASET_ID}.dim_vehiculo` v ON f.ID_Vehiculo = v.ID_Vehiculo
        INNER JOIN `{PROJECT_ID}.{DATASET_ID}.dim_transaccion` tr ON f.ID_Transaccion = tr.ID_Transaccion
        INNER JOIN `{PROJECT_ID}.{DATASET_ID}.dim_ubicacion` u ON f.ID_Ubicacion = u.ID_Ubicacion
        """

        result_integridad = client.query(query_integridad).to_dataframe()
        registros_validos = result_integridad.iloc[0]['registros_con_claves_validas']

        logging.info(f"🔗 Integridad referencial: {registros_validos} registros con todas las claves válidas")

        resumen_validacion = {
            'validaciones': validaciones,
            'registros_con_integridad': registros_validos,
            'timestamp': datetime.now().isoformat()
        }

        return resumen_validacion

    except Exception as e:
        logging.error(f"❌ Error en validación de calidad: {str(e)}")
        raise

def generar_metricas_negocio(**context):
    """
    Genera métricas de negocio del proceso ETL
    """
    try:
        logging.info("📈 Generando métricas de negocio...")

        client = bigquery.Client(project=PROJECT_ID)

        # Métricas por año
        query_por_anio = f"""
        SELECT
            t.Anio,
            COUNT(*) as total_registros,
            SUM(f.MontoAvaluo) as monto_total_avaluo,
            AVG(f.MontoAvaluo) as monto_promedio_avaluo
        FROM `{PROJECT_ID}.{DATASET_ID}.fact_registro_vehiculos` f
        INNER JOIN `{PROJECT_ID}.{DATASET_ID}.dim_tiempo` t ON f.ID_Tiempo = t.ID_Tiempo
        GROUP BY t.Anio
        ORDER BY t.Anio DESC
        LIMIT 5
        """

        metricas_anio = client.query(query_por_anio).to_dataframe()

        # Métricas por marca
        query_por_marca = f"""
        SELECT
            v.Marca,
            COUNT(*) as total_registros,
            AVG(f.MontoAvaluo) as avaluo_promedio
        FROM `{PROJECT_ID}.{DATASET_ID}.fact_registro_vehiculos` f
        INNER JOIN `{PROJECT_ID}.{DATASET_ID}.dim_vehiculo` v ON f.ID_Vehiculo = v.ID_Vehiculo
        GROUP BY v.Marca
        ORDER BY total_registros DESC
        LIMIT 10
        """

        metricas_marca = client.query(query_por_marca).to_dataframe()

        # Métricas por provincia
        query_por_provincia = f"""
        SELECT
            u.Provincia,
            u.Region,
            COUNT(*) as total_registros,
            SUM(f.MontoAvaluo) as monto_total
        FROM `{PROJECT_ID}.{DATASET_ID}.fact_registro_vehiculos` f
        INNER JOIN `{PROJECT_ID}.{DATASET_ID}.dim_ubicacion` u ON f.ID_Ubicacion = u.ID_Ubicacion
        GROUP BY u.Provincia, u.Region
        ORDER BY total_registros DESC
        LIMIT 10
        """

        metricas_provincia = client.query(query_por_provincia).to_dataframe()

        # Log de métricas
        logging.info("📊 MÉTRICAS POR AÑO:")
        for _, row in metricas_anio.iterrows():
            logging.info(f"   {row['Anio']}: {row['total_registros']} registros, "
                        f"avalúo total: ${row['monto_total_avaluo']:,.2f}")

        logging.info("🚗 TOP MARCAS:")
        for _, row in metricas_marca.iterrows():
            logging.info(f"   {row['Marca']}: {row['total_registros']} registros, "
                        f"avalúo promedio: ${row['avaluo_promedio']:,.2f}")

        logging.info("🌎 TOP PROVINCIAS:")
        for _, row in metricas_provincia.iterrows():
            logging.info(f"   {row['Provincia']} ({row['Region']}): {row['total_registros']} registros")

        metricas_resumen = {
            'metricas_por_anio': metricas_anio.to_dict('records'),
            'metricas_por_marca': metricas_marca.to_dict('records'),
            'metricas_por_provincia': metricas_provincia.to_dict('records'),
            'timestamp': datetime.now().isoformat()
        }

        return metricas_resumen

    except Exception as e:
        logging.error(f"❌ Error generando métricas: {str(e)}")
        raise

def notificar_finalizacion(**context):
    """
    Notifica la finalización exitosa del proceso ETL
    """
    try:
        logging.info("📧 Enviando notificación de finalización...")

        # Obtener información del contexto
        dag_run = context['dag_run']
        execution_date = context['execution_date']

        # Crear resumen del proceso
        resumen = {
            'dag_id': dag_run.dag_id,
            'execution_date': execution_date.isoformat(),
            'estado': 'EXITOSO',
            'duracion_total': str(datetime.now() - dag_run.start_date) if dag_run.start_date else 'N/A',
            'timestamp_finalizacion': datetime.now().isoformat()
        }

        logging.info("✅ PROCESO ETL FINALIZADO EXITOSAMENTE")
        logging.info(f"   DAG: {resumen['dag_id']}")
        logging.info(f"   Fecha de ejecución: {resumen['execution_date']}")
        logging.info(f"   Duración: {resumen['duracion_total']}")
        logging.info(f"   Estado: {resumen['estado']}")

        # Aquí se puede agregar lógica para enviar emails, Slack, etc.
        # Por ejemplo:
        # send_email_notification(resumen)
        # send_slack_notification(resumen)

        return resumen

    except Exception as e:
        logging.error(f"❌ Error en notificación: {str(e)}")
        raise

# ===============================
# TAREAS DE VALIDACIÓN Y MONITOREO
# ===============================

tarea_validacion = PythonOperator(
    task_id='validar_calidad_datos',
    python_callable=validar_calidad_datos,
    dag=dag
)

tarea_metricas = PythonOperator(
    task_id='generar_metricas_negocio',
    python_callable=generar_metricas_negocio,
    dag=dag
)

tarea_notificacion = PythonOperator(
    task_id='notificar_finalizacion',
    python_callable=notificar_finalizacion,
    dag=dag
)

# ===============================
# DEFINICIÓN DE DEPENDENCIAS DEL DAG
# ===============================

# Estructura de dependencias:
# inicio -> [dimensiones en paralelo] -> sincronización -> tabla_hechos -> validación -> métricas -> notificación -> fin

# Inicio del proceso
inicio >> [tarea_dim_tiempo, tarea_dim_vehiculo, tarea_dim_transaccion, tarea_dim_ubicacion]

# Sincronización de dimensiones
[tarea_dim_tiempo, tarea_dim_vehiculo, tarea_dim_transaccion, tarea_dim_ubicacion] >> sincronizacion_dimensiones

# Tabla de hechos (después de que todas las dimensiones estén listas)
sincronizacion_dimensiones >> tarea_fact_registro

# Validación y métricas
tarea_fact_registro >> tarea_validacion >> tarea_metricas >> tarea_notificacion >> finalizacion

# ===============================
# CONFIGURACIÓN ADICIONAL DEL DAG
# ===============================

# Configurar el DAG para logging detallado
dag.doc_md = """
# DAG ETL SRI Vehículos

Este DAG implementa un proceso ETL completo para los datos vehiculares del SRI.

## Estructura del Proceso:

1. **Dimensiones (Paralelo)**:
   - `dim_tiempo`: Genera calendario completo 2020-2025
   - `dim_vehiculo`: Extrae características únicas de vehículos
   - `dim_transaccion`: Mapea tipos de transacciones
   - `dim_ubicacion`: Mapea códigos de cantón a geografía

2. **Tabla de Hechos**:
   - `fact_registro_vehiculos`: Combina todas las dimensiones con métricas

3. **Validación y Monitoreo**:
   - Validación de calidad de datos
   - Generación de métricas de negocio
   - Notificaciones de finalización

## Configuración Requerida:

- PROJECT_ID: ID del proyecto de Google Cloud
- DATASET_ID: Nombre del dataset en BigQuery
- BUCKET_NAME: Nombre del bucket de Cloud Storage
- Service Account con permisos de BigQuery y Cloud Storage

## Archivos Requeridos:

- `gs://[BUCKET_NAME]/raw-data/sri_vehiculos.csv`

## Tablas Generadas:

- `dim_tiempo`
- `dim_vehiculo`
- `dim_transaccion`
- `dim_ubicacion`
- `fact_registro_vehiculos`
"""

# Configurar tags adicionales para organización
dag.tags.extend(['data-warehouse', 'gobierno', 'vehiculos'])

if __name__ == "__main__":
    dag.test()

OperationalError: (sqlite3.OperationalError) no such table: task_instance
[SQL: SELECT task_instance.try_number, task_instance.task_id, task_instance.dag_id, task_instance.run_id, task_instance.map_index, task_instance.start_date, task_instance.end_date, task_instance.duration, task_instance.state, task_instance.max_tries, task_instance.hostname, task_instance.unixname, task_instance.job_id, task_instance.pool, task_instance.pool_slots, task_instance.queue, task_instance.priority_weight, task_instance.operator, task_instance.custom_operator_name, task_instance.queued_dttm, task_instance.queued_by_job_id, task_instance.pid, task_instance.executor_config, task_instance.updated_at, task_instance.external_executor_id, task_instance.trigger_id, task_instance.trigger_timeout, task_instance.next_method, task_instance.next_kwargs, dag_run_1.state AS state_1, dag_run_1.id, dag_run_1.dag_id AS dag_id_1, dag_run_1.queued_at, dag_run_1.execution_date, dag_run_1.start_date AS start_date_1, dag_run_1.end_date AS end_date_1, dag_run_1.run_id AS run_id_1, dag_run_1.creating_job_id, dag_run_1.external_trigger, dag_run_1.run_type, dag_run_1.conf, dag_run_1.data_interval_start, dag_run_1.data_interval_end, dag_run_1.last_scheduling_decision, dag_run_1.dag_hash, dag_run_1.log_template_id, dag_run_1.updated_at AS updated_at_1 
FROM task_instance JOIN dag_run ON dag_run.dag_id = task_instance.dag_id AND dag_run.run_id = task_instance.run_id JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id 
WHERE task_instance.dag_id = ? AND task_instance.task_id IN (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) AND dag_run.execution_date >= ? AND dag_run.execution_date <= ? AND task_instance.operator = ?]
[parameters: ('sri_vehiculos_etl_proceso', 'inicio_proceso_etl', 'etl_dim_tiempo', 'etl_dim_vehiculo', 'etl_dim_transaccion', 'etl_dim_ubicacion', 'sincronizacion_dimensiones', 'etl_fact_registro_vehiculos', 'finalizacion_proceso_etl', 'validar_calidad_datos', 'generar_metricas_negocio', 'notificar_finalizacion', '2025-07-02 05:07:54.590300', '2025-07-02 05:07:54.590300', 'ExternalTaskMarker')]
(Background on this error at: https://sqlalche.me/e/14/e3q8)

In [36]:
# Celda 9: Ejecutar ETL de dimensiones
print("🚀 Ejecutando ETL de dimensiones...")

# Ejecutar cada dimensión
try:
    resultado_tiempo = etl_dim_tiempo()
    print(f"1️⃣ {resultado_tiempo}")
except Exception as e:
    print(f"❌ Error en dim_tiempo: {e}")

try:
    resultado_vehiculo = etl_dim_vehiculo()
    print(f"2️⃣ {resultado_vehiculo}")
except Exception as e:
    print(f"❌ Error en dim_vehiculo: {e}")

try:
    resultado_transaccion = etl_dim_transaccion()
    print(f"3️⃣ {resultado_transaccion}")
except Exception as e:
    print(f"❌ Error en dim_transaccion: {e}")

try:
    resultado_ubicacion = etl_dim_ubicacion()
    print(f"4️⃣ {resultado_ubicacion}")
except Exception as e:
    print(f"❌ Error en dim_ubicacion: {e}")

print("✅ Dimensiones completadas")


🚀 Ejecutando ETL de dimensiones...
[[34m2025-07-02T05:09:08.525+0000[0m] {[34mipython-input-35-949037698.py:[0m56} INFO[0m - 🕐 Iniciando ETL para Dim_Tiempo...[0m
[[34m2025-07-02T05:09:08.648+0000[0m] {[34mipython-input-35-949037698.py:[0m66} INFO[0m - 📅 Generando 2192 registros de fechas...[0m
[[34m2025-07-02T05:09:15.092+0000[0m] {[34mipython-input-35-949037698.py:[0m116} INFO[0m - ✅ Cargados 2192 registros en dim_tiempo[0m
1️⃣ Dim_Tiempo cargada exitosamente: 2192 registros
[[34m2025-07-02T05:09:15.094+0000[0m] {[34mipython-input-35-949037698.py:[0m129} INFO[0m - 🚗 Iniciando ETL para Dim_Vehiculo...[0m
[[34m2025-07-02T05:09:19.223+0000[0m] {[34mipython-input-35-949037698.py:[0m143} INFO[0m - 📊 Datos extraídos: 460550 registros originales[0m
[[34m2025-07-02T05:09:21.910+0000[0m] {[34mipython-input-35-949037698.py:[0m196} INFO[0m - 🔧 Transformación completada: 331160 vehículos únicos[0m
[[34m2025-07-02T05:09:27.789+0000[0m] {[34mipython-input-35-

In [37]:
# Celda 10: Ejecutar ETL de tabla de hechos
print("📊 Ejecutando ETL de tabla de hechos...")

try:
    resultado_fact = etl_fact_registro_vehiculos()
    print(f"✅ {resultado_fact}")
except Exception as e:
    print(f"❌ Error en fact table: {e}")

📊 Ejecutando ETL de tabla de hechos...
[[34m2025-07-02T05:09:49.220+0000[0m] {[34mipython-input-35-949037698.py:[0m401} INFO[0m - 📊 Iniciando ETL para Fact_RegistroVehiculos...[0m
[[34m2025-07-02T05:09:54.546+0000[0m] {[34mipython-input-35-949037698.py:[0m414} INFO[0m - 📊 Datos extraídos: 460550 registros de hechos[0m
[[34m2025-07-02T05:09:54.549+0000[0m] {[34mipython-input-35-949037698.py:[0m417} INFO[0m - 🔍 Cargando dimensiones para lookups...[0m
[[34m2025-07-02T05:09:56.236+0000[0m] {[34mipython-input-35-949037698.py:[0m439} ERROR[0m - Error cargando dimensiones: 403 request failed: the user does not have 'bigquery.readsessions.create' permission for 'projects/sri-vehiculos-etl'[0m
[[34m2025-07-02T05:09:56.239+0000[0m] {[34mipython-input-35-949037698.py:[0m586} ERROR[0m - ❌ Error en ETL Fact_RegistroVehiculos: 403 request failed: the user does not have 'bigquery.readsessions.create' permission for 'projects/sri-vehiculos-etl'[0m
❌ Error en fact table: 4

In [38]:
# Celda 11: Ejecutar validaciones y métricas
print("🔍 Ejecutando validaciones...")

try:
    validacion = validar_calidad_datos()
    print("✅ Validación completada")
except Exception as e:
    print(f"❌ Error en validación: {e}")

try:
    metricas = generar_metricas_negocio()
    print("✅ Métricas generadas")
except Exception as e:
    print(f"❌ Error en métricas: {e}")

🔍 Ejecutando validaciones...
[[34m2025-07-02T05:10:27.669+0000[0m] {[34mipython-input-35-949037698.py:[0m652} INFO[0m - 🔍 Iniciando validación de calidad de datos...[0m
[[34m2025-07-02T05:10:28.926+0000[0m] {[34mipython-input-35-949037698.py:[0m759} ERROR[0m - ❌ Error en validación de calidad: 403 request failed: the user does not have 'bigquery.readsessions.create' permission for 'projects/sri-vehiculos-etl'[0m
❌ Error en validación: 403 request failed: the user does not have 'bigquery.readsessions.create' permission for 'projects/sri-vehiculos-etl'
[[34m2025-07-02T05:10:28.928+0000[0m] {[34mipython-input-35-949037698.py:[0m767} INFO[0m - 📈 Generando métricas de negocio...[0m
[[34m2025-07-02T05:10:29.308+0000[0m] {[34mipython-input-35-949037698.py:[0m843} ERROR[0m - ❌ Error generando métricas: 400 Name MontoAvaluo not found inside f at [5:19]; reason: invalidQuery, location: query, message: Name MontoAvaluo not found inside f at [5:19]

Location: us-central1
Job

In [39]:
# Celda 12: Verificar tablas creadas
from google.cloud import bigquery

client = bigquery.Client(project=PROJECT_ID)

tablas_esperadas = ['dim_tiempo', 'dim_vehiculo', 'dim_transaccion', 'dim_ubicacion', 'fact_registro_vehiculos']

print("📋 Verificando tablas creadas:")
for tabla in tablas_esperadas:
    try:
        query = f"SELECT COUNT(*) as total FROM `{PROJECT_ID}.{DATASET_ID}.{tabla}`"
        result = client.query(query).to_dataframe()
        total = result.iloc[0]['total']
        print(f"✅ {tabla}: {total:,} registros")
    except Exception as e:
        print(f"❌ {tabla}: Error - {e}")

📋 Verificando tablas creadas:
❌ dim_tiempo: Error - 403 request failed: the user does not have 'bigquery.readsessions.create' permission for 'projects/sri-vehiculos-etl'
❌ dim_vehiculo: Error - 403 request failed: the user does not have 'bigquery.readsessions.create' permission for 'projects/sri-vehiculos-etl'
❌ dim_transaccion: Error - 403 request failed: the user does not have 'bigquery.readsessions.create' permission for 'projects/sri-vehiculos-etl'
❌ dim_ubicacion: Error - 403 request failed: the user does not have 'bigquery.readsessions.create' permission for 'projects/sri-vehiculos-etl'
❌ fact_registro_vehiculos: Error - 403 request failed: the user does not have 'bigquery.readsessions.create' permission for 'projects/sri-vehiculos-etl'


In [40]:
# Celda 13: Consulta de ejemplo del data warehouse
query_ejemplo = f"""
SELECT
    t.Anio,
    v.Marca,
    u.Provincia,
    COUNT(*) as total_registros,
    AVG(f.MontoAvaluo) as avaluo_promedio
FROM `{PROJECT_ID}.{DATASET_ID}.fact_registro_vehiculos` f
INNER JOIN `{PROJECT_ID}.{DATASET_ID}.dim_tiempo` t ON f.ID_Tiempo = t.ID_Tiempo
INNER JOIN `{PROJECT_ID}.{DATASET_ID}.dim_vehiculo` v ON f.ID_Vehiculo = v.ID_Vehiculo
INNER JOIN `{PROJECT_ID}.{DATASET_ID}.dim_ubicacion` u ON f.ID_Ubicacion = u.ID_Ubicacion
GROUP BY t.Anio, v.Marca, u.Provincia
ORDER BY total_registros DESC
LIMIT 10
"""

try:
    resultado = client.query(query_ejemplo).to_dataframe()
    print("📊 Top 10 combinaciones Año-Marca-Provincia:")
    print(resultado.to_string(index=False))
except Exception as e:
    print(f"❌ Error en consulta: {e}")

❌ Error en consulta: 400 Name MontoAvaluo not found inside f at [7:11]; reason: invalidQuery, location: query, message: Name MontoAvaluo not found inside f at [7:11]

Location: us-central1
Job ID: b0f5d031-ddbd-45b1-bd36-7de68c443629

