# 📊 Creación de Tablas Consolidadas - pph-central.bronze

## Objetivo
Crear tablas consolidadas optimizadas en `pph-central.bronze` usando las vistas Silver de cada compañía.

## Flujo
1. **Análisis** - Ver qué tablas tienen vistas Silver disponibles
2. **Metadatos** - Revisar configuración de particionado/clusterizado  
3. **Prueba** - Crear una tabla consolidada como ejemplo
4. **Validación** - Verificar estructura y datos
5. **Escalado** - Crear todas las tablas restantes

## Ventajas del Notebook
✅ Sin timeouts de Cloud Shell  
✅ Ejecución query por query  
✅ Visualización inmediata de resultados  
✅ Debugging fácil  
✅ Reutilizable para futuras ejecuciones


In [None]:
# 📋 CONFIGURACIÓN Y SETUP CON AUTENTICACIÓN
import pandas as pd
from google.cloud import bigquery
from google.auth import default
import os

# Configuración
PROJECT_CENTRAL = 'pph-central'
PROJECT_SOURCE = 'platform-partners-pro'
DATASET_BRONZE = 'bronze'
DATASET_SILVER = 'silver'
DATASET_SETTINGS = 'settings'
DATASET_MANAGEMENT = 'management'

# 🔐 AUTENTICACIÓN EXPLÍCITA
try:
    # Intentar autenticación por defecto
    credentials, project = default()
    
    # Crear cliente con autenticación explícita
    client = bigquery.Client(project=PROJECT_CENTRAL, credentials=credentials)
    
    print("🔐 AUTENTICACIÓN:")
    print(f"   ✅ Credenciales obtenidas correctamente")
    print(f"   📧 Usuario: {credentials.service_account_email if hasattr(credentials, 'service_account_email') else 'Usuario actual'}")
    print(f"   🎯 Proyecto activo: {PROJECT_CENTRAL}")
    
except Exception as e:
    print(f"❌ Error de autenticación: {str(e)}")
    print("💡 Solución: Ejecuta 'gcloud auth login' en Cloud Shell antes de abrir el notebook")
    client = None

print("\n🔧 CONFIGURACIÓN:")
print(f"   Proyecto Central: {PROJECT_CENTRAL}")
print(f"   Proyecto Source: {PROJECT_SOURCE}")
print(f"   Dataset Bronze: {DATASET_BRONZE}")
print(f"   Dataset Silver: {DATASET_SILVER}")
print(f"   Dataset Settings: {DATASET_SETTINGS}")
print(f"   Dataset Management: {DATASET_MANAGEMENT}")

if client:
    print("\n✅ Setup completado - Cliente BigQuery inicializado con autenticación")
else:
    print("\n❌ Setup fallido - Problema de autenticación")


## 🔍 PASO 1: Análisis de Tablas Disponibles

Verificamos qué tablas tienen vistas Silver exitosas y cuántas compañías están disponibles para cada una.


In [None]:
# 📊 ANÁLISIS: Tablas con Vistas Silver Disponibles
query_analysis = f"""
SELECT 
  table_name,
  COUNT(*) as companies_with_silver_views,
  STRING_AGG(CAST(company_id AS STRING), ', ' ORDER BY company_id) as company_ids,
  STRING_AGG(company_name, ', ' ORDER BY company_id) as company_names
FROM (
  SELECT DISTINCT
    cc.table_name,
    cc.company_id,
    c.company_name
  FROM `{PROJECT_SOURCE}.{DATASET_SETTINGS}.companies_consolidated` cc
  JOIN `{PROJECT_SOURCE}.{DATASET_SETTINGS}.companies` c
    ON cc.company_id = c.company_id
  WHERE cc.consolidated_status = 1  -- Solo vistas Silver exitosas
    AND c.company_fivetran_status = TRUE
    AND c.company_bigquery_status = TRUE
)
GROUP BY table_name
ORDER BY companies_with_silver_views DESC, table_name
"""

# Ejecutar query y mostrar resultados
try:
    df_analysis = client.query(query_analysis).to_dataframe()
    print("📊 TABLAS CON VISTAS SILVER DISPONIBLES:")
    print("=" * 80)
    print(df_analysis.to_string(index=False))
    
    # Guardar para uso posterior
    available_tables = df_analysis['table_name'].tolist()
    print(f"\n📋 Total de tablas disponibles: {len(available_tables)}")
    
except Exception as e:
    print(f"❌ Error en análisis: {str(e)}")
    available_tables = []


## ⚙️ PASO 2: Configuración de Metadatos

Revisamos la configuración de particionado y clusterizado para cada tabla.


In [None]:
# 📋 METADATOS: Configuración de Particionado y Clusterizado
query_metadata = f"""
SELECT 
  table_name,
  partition_fields,
  cluster_fields,
  update_strategy,
  created_at,
  updated_at
FROM `{PROJECT_CENTRAL}.{DATASET_MANAGEMENT}.metadata_consolidated_tables`
ORDER BY table_name
"""

try:
    df_metadata = client.query(query_metadata).to_dataframe()
    print("⚙️ CONFIGURACIÓN DE METADATOS:")
    print("=" * 100)
    print(df_metadata.to_string(index=False))
    
    # Crear diccionario para acceso rápido
    metadata_dict = {}
    for _, row in df_metadata.iterrows():
        metadata_dict[row['table_name']] = {
            'partition_fields': row['partition_fields'],
            'cluster_fields': row['cluster_fields'],
            'update_strategy': row['update_strategy']
        }
    
    print(f"\n📋 Metadatos cargados para {len(metadata_dict)} tablas")
    
except Exception as e:
    print(f"❌ Error cargando metadatos: {str(e)}")
    metadata_dict = {}


## 🧪 PASO 3: Prueba con Tabla Individual

Vamos a crear una tabla consolidada como ejemplo. Usaremos `appointment` que suele tener muchas compañías disponibles.

**Nota:** Cambia `appointment` por la tabla que quieras probar según los resultados del análisis anterior.


In [None]:
# 🔧 REGLAS DE NORMALIZACIÓN IDÉNTICAS AL SCRIPT generate_silver_views.py
def generate_cast_for_field(field_name, source_type, target_type):
    """
    Genera la expresión CAST apropiada para un campo
    IDÉNTICA a la función en generate_silver_views.py
    """
    if source_type == target_type:
        return field_name
    
    # Mapeo de conversiones seguras - IDÉNTICO AL SCRIPT
    safe_casts = {
        ('INT64', 'STRING'): f"CAST({field_name} AS STRING)",
        ('INT64', 'FLOAT64'): f"CAST({field_name} AS FLOAT64)",
        ('FLOAT64', 'STRING'): f"CAST({field_name} AS STRING)",
        # 🚨 CORREGIDO: STRING a INT64/FLOAT64 NO es seguro si contiene letras
        ('STRING', 'INT64'): f"CAST({field_name} AS STRING)",  # Mantener como STRING
        ('STRING', 'FLOAT64'): f"CAST({field_name} AS STRING)",  # Mantener como STRING
        ('STRING', 'BOOL'): f"SAFE_CAST({field_name} AS BOOL)",
        ('BOOL', 'STRING'): f"CAST({field_name} AS STRING)",
        ('DATE', 'STRING'): f"CAST({field_name} AS STRING)",
        ('DATETIME', 'STRING'): f"CAST({field_name} AS STRING)",
        ('TIMESTAMP', 'STRING'): f"CAST({field_name} AS STRING)",
        # 🚨 CRÍTICO: TIMESTAMP vs INT64 - IDÉNTICO AL SCRIPT
        ('INT64', 'TIMESTAMP'): f"TIMESTAMP_SECONDS({field_name})",
        ('TIMESTAMP', 'INT64'): f"UNIX_SECONDS({field_name})",
        # JSON a otros tipos - IDÉNTICO AL SCRIPT
        ('JSON', 'STRING'): f"COALESCE(TO_JSON_STRING({field_name}), '')",
        ('JSON', 'INT64'): f"COALESCE(TO_JSON_STRING({field_name}), '')",  # A STRING
        ('JSON', 'FLOAT64'): f"COALESCE(TO_JSON_STRING({field_name}), '')"  # A STRING
    }
    
    cast_key = (source_type, target_type)
    if cast_key in safe_casts:
        return safe_casts[cast_key]
    
    # Para conversiones no seguras, usar SAFE_CAST con valor por defecto - IDÉNTICO AL SCRIPT
    defaults = {
        'STRING': "''",
        'INT64': '0',
        'FLOAT64': '0.0',
        'BOOL': 'FALSE',
        'DATE': 'NULL',
        'DATETIME': 'NULL',
        'TIMESTAMP': 'NULL',
        'JSON': 'NULL',
        'BYTES': 'NULL'
    }
    default_value = defaults.get(target_type, 'NULL')
    
    return f"COALESCE(SAFE_CAST({field_name} AS {target_type}), {default_value})"

# 🔍 FUNCIÓN: Obtener estructura de vista Silver de una compañía específica
def get_silver_view_structure(project_id, table_name):
    """Obtiene la estructura de una vista Silver específica"""
    try:
        query = f"""
        SELECT 
          column_name,
          ordinal_position,
          data_type,
          is_nullable
        FROM `{project_id}.{DATASET_SILVER}.INFORMATION_SCHEMA.COLUMNS`
        WHERE table_name = 'vw_{table_name}'
        ORDER BY ordinal_position
        """
        
        df_structure = client.query(query).to_dataframe()
        return df_structure
    except Exception as e:
        print(f"❌ Error obteniendo estructura de {project_id}.{DATASET_SILVER}.vw_{table_name}: {str(e)}")
        return pd.DataFrame()

print("✅ Reglas de normalización sincronizadas con generate_silver_views.py")


## ⚠️ IMPORTANTE: Particionamiento por MES

**Cambio aplicado:** Las tablas se particionan por **MES** (`DATE_TRUNC(created_on, MONTH)`) en lugar de día.

**Razón:** BigQuery tiene un límite de 4000 particiones por tabla. Con particionamiento por día:
- 4000 días = ~11 años de datos
- Si tienes datos históricos más antiguos → Error

**Con particionamiento por MES:**
- 4000 meses = ~333 años de datos ✅
- Reduce particiones significativamente
- Mejor rendimiento para consultas mensuales/anuales

**Si necesitas particionamiento por día:**
1. Filtra datos históricos (ej: últimos 5 años)
2. O usa particionamiento por YEAR para datos históricos


In [None]:
# 🔍 FUNCIÓN: Obtener Compañías para una Tabla Específica
def get_companies_for_table(table_name):
    """Obtiene las compañías disponibles para una tabla específica"""
    query = f"""
    SELECT 
      c.company_id,
      c.company_name,
      c.company_project_id
    FROM `{PROJECT_SOURCE}.{DATASET_SETTINGS}.companies_consolidated` cc
    JOIN `{PROJECT_SOURCE}.{DATASET_SETTINGS}.companies` c
      ON cc.company_id = c.company_id
    WHERE cc.table_name = '{table_name}'
      AND cc.consolidated_status = 1  -- Solo vistas Silver exitosas
      AND c.company_fivetran_status = TRUE
      AND c.company_bigquery_status = TRUE
    ORDER BY c.company_id
    """
    
    try:
        df_companies = client.query(query).to_dataframe()
        return df_companies
    except Exception as e:
        print(f"❌ Error obteniendo compañías para {table_name}: {str(e)}")
        return pd.DataFrame()

# 🧪 PRUEBA: Ver Compañías para appointment (o cambiar por otra tabla)
test_table = 'appointment'  # Cambiar por la tabla que quieras probar
print(f"🔍 COMPAÑÍAS DISPONIBLES PARA: {test_table}")
print("=" * 60)

df_companies = get_companies_for_table(test_table)
if not df_companies.empty:
    print(df_companies.to_string(index=False))
    print(f"\n📋 Total de compañías: {len(df_companies)}")
else:
    print(f"⚠️  No hay compañías disponibles para {test_table}")
    print("💡 Prueba con otra tabla de la lista anterior")


In [None]:
# 🏗️ FUNCIÓN: Crear Tabla Consolidada
def create_consolidated_table(table_name, companies_df):
    """Crea una tabla consolidada para una tabla específica"""
    
    if companies_df.empty:
        print(f"❌ No hay compañías disponibles para {table_name}")
        return False
    
    # Obtener metadatos
    if table_name in metadata_dict:
        metadata = metadata_dict[table_name]
        partition_field = metadata['partition_fields'][0]
        cluster_fields = metadata['cluster_fields']
    else:
        # Valores por defecto
        partition_field = 'created_on'
        cluster_fields = ['company_id']
    
    # Construir UNION ALL parts
    union_parts = []
    for _, company in companies_df.iterrows():
        union_part = f"""
        SELECT 
          '{company['company_project_id']}' AS company_project_id,
          {company['company_id']} AS company_id,
          *
        FROM `{company['company_project_id']}.{DATASET_SILVER}.vw_{table_name}`"""
        union_parts.append(union_part)
    
    # Configurar clusterizado
    cluster_sql = f"CLUSTER BY {', '.join(cluster_fields)}" if cluster_fields else ""
    
    # CRÍTICO: Usar MONTH para evitar límite de 4000 particiones
    # Si tienes más de 11 años de datos, cambia a YEAR
    # SQL completo
    create_sql = f"""
    CREATE OR REPLACE TABLE `{PROJECT_CENTRAL}.{DATASET_BRONZE}.consolidated_{table_name}`
    PARTITION BY DATE_TRUNC({partition_field}, MONTH)
    {cluster_sql}
    AS
    {' UNION ALL '.join(union_parts)}
    """
    
    print(f"🔄 Creando tabla consolidada: consolidated_{table_name}")
    print(f"📊 Compañías: {len(companies_df)}")
    print(f"⚙️ Particionado: {partition_field}")
    print(f"🔗 Clusterizado: {cluster_fields}")
    
    try:
        query_job = client.query(create_sql)
        query_job.result()
        print(f"✅ Tabla creada exitosamente: {PROJECT_CENTRAL}.{DATASET_BRONZE}.consolidated_{table_name}")
        return True
    except Exception as e:
        print(f"❌ Error creando tabla: {str(e)}")
        return False

# 🧪 PRUEBA: Crear tabla consolidada para la tabla de prueba
if not df_companies.empty:
    success = create_consolidated_table(test_table, df_companies)
    if success:
        print(f"\n🎉 ¡Tabla {test_table} creada exitosamente!")
    else:
        print(f"\n❌ Error creando tabla {test_table}")
else:
    print(f"\n⚠️  No se puede crear tabla para {test_table} - no hay compañías disponibles")


## ✅ PASO 4: Validación de Tabla Creada

Verificamos que la tabla se creó correctamente y tiene los datos esperados.


In [None]:
# 📊 VALIDACIÓN: Información de la Tabla Creada
def validate_table(table_name):
    """Valida que la tabla consolidada se creó correctamente"""
    
    # Información básica de la tabla
    query_info = f"""
    SELECT 
      table_name,
      table_type,
      row_count,
      size_bytes,
      creation_time,
      last_modified_time
    FROM `{PROJECT_CENTRAL}.{DATASET_BRONZE}.INFORMATION_SCHEMA.TABLES`
    WHERE table_name = 'consolidated_{table_name}'
    """
    
    try:
        df_info = client.query(query_info).to_dataframe()
        if not df_info.empty:
            print(f"📊 INFORMACIÓN DE LA TABLA: consolidated_{table_name}")
            print("=" * 80)
            print(df_info.to_string(index=False))
            return True
        else:
            print(f"❌ Tabla consolidated_{table_name} no encontrada")
            return False
    except Exception as e:
        print(f"❌ Error validando tabla: {str(e)}")
        return False

# Validar la tabla de prueba
if 'test_table' in locals():
    validate_table(test_table)
else:
    print("⚠️  Ejecuta primero la celda de creación de tabla")


In [None]:
# 🔍 VALIDACIÓN: Estructura y Datos de la Tabla
def show_table_structure(table_name):
    """Muestra la estructura de la tabla consolidada"""
    
    query_columns = f"""
    SELECT 
      column_name,
      ordinal_position,
      data_type,
      is_nullable
    FROM `{PROJECT_CENTRAL}.{DATASET_BRONZE}.INFORMATION_SCHEMA.COLUMNS`
    WHERE table_name = 'consolidated_{table_name}'
    ORDER BY ordinal_position
    """
    
    try:
        df_columns = client.query(query_columns).to_dataframe()
        if not df_columns.empty:
            print(f"🏗️ ESTRUCTURA DE LA TABLA: consolidated_{table_name}")
            print("=" * 80)
            print(df_columns.to_string(index=False))
            return True
        else:
            print(f"❌ No se encontraron columnas para consolidated_{table_name}")
            return False
    except Exception as e:
        print(f"❌ Error obteniendo estructura: {str(e)}")
        return False

# Mostrar estructura de la tabla de prueba
if 'test_table' in locals():
    show_table_structure(test_table)
else:
    print("⚠️  Ejecuta primero la celda de creación de tabla")


In [None]:
# 📈 VALIDACIÓN: Distribución por Compañía
def show_company_distribution(table_name):
    """Muestra la distribución de datos por compañía"""
    
    # Determinar campo de particionado
    if table_name in metadata_dict:
        partition_field = metadata_dict[table_name]['partition_fields'][0]
    else:
        partition_field = 'created_on'
    
    query_distribution = f"""
    SELECT 
      company_project_id,
      company_id,
      COUNT(*) as record_count,
      MIN({partition_field}) as earliest_record,
      MAX({partition_field}) as latest_record
    FROM `{PROJECT_CENTRAL}.{DATASET_BRONZE}.consolidated_{table_name}`
    GROUP BY company_project_id, company_id
    ORDER BY company_id
    """
    
    try:
        df_distribution = client.query(query_distribution).to_dataframe()
        if not df_distribution.empty:
            print(f"📈 DISTRIBUCIÓN POR COMPAÑÍA: consolidated_{table_name}")
            print("=" * 100)
            print(df_distribution.to_string(index=False))
            
            total_records = df_distribution['record_count'].sum()
            print(f"\n📊 Total de registros: {total_records:,}")
            print(f"📊 Compañías incluidas: {len(df_distribution)}")
            return True
        else:
            print(f"❌ No se encontraron datos para consolidated_{table_name}")
            return False
    except Exception as e:
        print(f"❌ Error obteniendo distribución: {str(e)}")
        return False

# Mostrar distribución de la tabla de prueba
if 'test_table' in locals():
    show_company_distribution(test_table)
else:
    print("⚠️  Ejecuta primero la celda de creación de tabla")


# 🚀 CREAR TODAS LAS TABLAS RESTANTES
def create_all_consolidated_tables():
    """Crea todas las tablas consolidadas disponibles"""
    
    if not available_tables:
        print("❌ No hay tablas disponibles. Ejecuta primero el análisis.")
        return
    
    print(f"🚀 CREANDO TODAS LAS TABLAS CONSOLIDADAS")
    print(f"📋 Total de tablas: {len(available_tables)}")
    print("=" * 80)
    
    success_count = 0
    error_count = 0
    
    for i, table_name in enumerate(available_tables, 1):
        print(f"\n📊 Procesando {i}/{len(available_tables)}: {table_name}")
        
        # Obtener compañías para esta tabla
        companies_df = get_companies_for_table(table_name)
        
        if companies_df.empty:
            print(f"  ⚠️  Sin compañías disponibles - SALTAR")
            continue
        
        # Crear tabla consolidada
        success = create_consolidated_table(table_name, companies_df)
        
        if success:
            success_count += 1
            print(f"  ✅ Tabla {table_name} creada exitosamente")
        else:
            error_count += 1
            print(f"  ❌ Error creando tabla {table_name}")
    
    print(f"\n🎯 RESUMEN FINAL:")
    print(f"✅ Tablas creadas exitosamente: {success_count}")
    print(f"❌ Tablas con errores: {error_count}")
    print(f"📊 Total procesadas: {success_count + error_count}")

# 🧪 EJECUTAR CREACIÓN DE TODAS LAS TABLAS
# Descomenta la siguiente línea para ejecutar todas las tablas:
# create_all_consolidated_tables()

print("💡 Para crear todas las tablas, descomenta la línea: create_all_consolidated_tables()")
print("📋 Tablas disponibles para procesar:")
for table in available_tables:
    print(f"   - {table}")


## 📋 PASO 6: Verificación Final

Verificamos que todas las tablas consolidadas se crearon correctamente.


In [None]:
# 📊 RESUMEN: Todas las Tablas Consolidadas Creadas
def show_final_summary():
    """Muestra el resumen final de todas las tablas consolidadas"""
    
    query_summary = f"""
    SELECT 
      table_name,
      table_type,
      row_count,
      ROUND(size_bytes / 1024 / 1024, 2) as size_mb,
      creation_time,
      last_modified_time
    FROM `{PROJECT_CENTRAL}.{DATASET_BRONZE}.INFORMATION_SCHEMA.TABLES`
    WHERE table_name LIKE 'consolidated_%'
    ORDER BY row_count DESC
    """
    
    try:
        df_summary = client.query(query_summary).to_dataframe()
        if not df_summary.empty:
            print("📊 RESUMEN FINAL - TABLAS CONSOLIDADAS CREADAS")
            print("=" * 120)
            print(df_summary.to_string(index=False))
            
            total_tables = len(df_summary)
            total_rows = df_summary['row_count'].sum()
            total_size = df_summary['size_mb'].sum()
            
            print(f"\n🎯 ESTADÍSTICAS FINALES:")
            print(f"📊 Total de tablas consolidadas: {total_tables}")
            print(f"📊 Total de registros: {total_rows:,}")
            print(f"📊 Tamaño total: {total_size:.2f} MB")
            
            return True
        else:
            print("❌ No se encontraron tablas consolidadas")
            return False
    except Exception as e:
        print(f"❌ Error obteniendo resumen: {str(e)}")
        return False

# Mostrar resumen final
show_final_summary()


## 🎯 PRÓXIMOS PASOS

### ✅ Completado:
- Tablas consolidadas creadas en `pph-central.bronze`
- Optimizadas con particionado y clusterizado
- Datos de todas las compañías unificados

### 🔄 Siguiente Paso:
**Crear vistas consolidadas en `pph-central.silver`** que apunten a las tablas bronze creadas.

### 📝 Notas:
- Guarda este notebook como referencia
- Puedes reutilizarlo para futuras actualizaciones
- Los datos se actualizarán cada 6 horas según el scheduling


## 🚨 INVESTIGACIÓN CRÍTICA: ERROR DE NORMALIZACIÓN EN VISTAS SILVER

**PROBLEMA GRAVE:** Las vistas Silver NO están normalizando tipos de datos correctamente.
**ERROR:** Column 7 has incompatible types: TIMESTAMP, INT64, TIMESTAMP...

**INVESTIGACIÓN INMEDIATA:**


In [None]:
# 🚨 INVESTIGACIÓN: Comparar Estructuras de Vistas Silver por Compañía
def investigate_silver_views_structure(table_name):
    """Investiga las diferencias en estructura de vistas Silver entre compañías"""
    
    # Obtener compañías disponibles
    companies_df = get_companies_for_table(table_name)
    if companies_df.empty:
        print(f"❌ No hay compañías para {table_name}")
        return
    
    print(f"🔍 INVESTIGANDO ESTRUCTURA DE VISTAS SILVER: {table_name}")
    print("=" * 100)
    
    # Analizar estructura de cada compañía
    structures = {}
    
    for _, company in companies_df.iterrows():
        project_id = company['company_project_id']
        company_name = company['company_name']
        
        try:
            # Query para obtener estructura de la vista Silver
            query_structure = f"""
            SELECT 
              column_name,
              ordinal_position,
              data_type,
              is_nullable
            FROM `{project_id}.{DATASET_SILVER}.INFORMATION_SCHEMA.COLUMNS`
            WHERE table_name = 'vw_{table_name}'
            ORDER BY ordinal_position
            """
            
            df_structure = client.query(query_structure).to_dataframe()
            
            if not df_structure.empty:
                structures[company_name] = df_structure
                print(f"✅ {company_name}: {len(df_structure)} columnas")
            else:
                print(f"❌ {company_name}: Vista no encontrada")
                
        except Exception as e:
            print(f"❌ {company_name}: Error - {str(e)}")
    
    return structures

# Investigar appointment (tabla que falló)
print("🚨 INVESTIGACIÓN CRÍTICA - TABLA: appointment")
structures = investigate_silver_views_structure('appointment')


In [None]:
# 🚨 ANÁLISIS: Identificar Columnas con Tipos Incompatibles
def find_type_conflicts(structures):
    """Identifica columnas con tipos de datos incompatibles entre compañías"""
    
    if not structures:
        print("❌ No hay estructuras para analizar")
        return
    
    print("\n🔍 ANÁLISIS DE CONFLICTOS DE TIPOS:")
    print("=" * 100)
    
    # Obtener todas las columnas únicas
    all_columns = set()
    for company, df in structures.items():
        all_columns.update(df['column_name'].tolist())
    
    # Analizar cada columna
    conflicts = []
    
    for column in sorted(all_columns):
        column_types = {}
        
        for company, df in structures.items():
            col_data = df[df['column_name'] == column]
            if not col_data.empty:
                data_type = col_data.iloc[0]['data_type']
                column_types[company] = data_type
        
        # Verificar si hay conflictos
        unique_types = set(column_types.values())
        if len(unique_types) > 1:
            conflicts.append({
                'column': column,
                'types': unique_types,
                'companies': column_types
            })
    
    if conflicts:
        print(f"❌ CONFLICTOS ENCONTRADOS: {len(conflicts)} columnas")
        print("\n🚨 COLUMNAS CON TIPOS INCOMPATIBLES:")
        
        for i, conflict in enumerate(conflicts, 1):
            print(f"\n{i}. COLUMNA: {conflict['column']}")
            print(f"   TIPOS: {', '.join(conflict['types'])}")
            print("   POR COMPAÑÍA:")
            for company, data_type in conflict['companies'].items():
                print(f"     - {company}: {data_type}")
    else:
        print("✅ NO HAY CONFLICTOS DE TIPOS")
    
    return conflicts

# Analizar conflictos en appointment
if structures:
    conflicts = find_type_conflicts(structures)
else:
    print("❌ No hay estructuras para analizar")


In [None]:
# 🚨 SOLUCIÓN INMEDIATA: Forzar Normalización en Bronze
def create_consolidated_table_with_normalization(table_name, companies_df):
    """Crea tabla consolidada FORZANDO normalización de tipos en bronze"""
    
    if companies_df.empty:
        print(f"❌ No hay compañías disponibles para {table_name}")
        return False
    
    # Obtener metadatos
    if table_name in metadata_dict:
        metadata = metadata_dict[table_name]
        partition_field = metadata['partition_fields'][0]
        cluster_fields = metadata['cluster_fields']
    else:
        partition_field = 'created_on'
        cluster_fields = ['company_id']
    
    print(f"🚨 SOLUCIÓN DE EMERGENCIA: Normalización forzada en bronze")
    print(f"🔄 Creando tabla consolidada: consolidated_{table_name}")
    print(f"📊 Compañías: {len(companies_df)}")
    
    # Construir UNION ALL con normalización forzada
    union_parts = []
    
    for _, company in companies_df.iterrows():
        # Aplicar normalización agresiva en el SELECT
        union_part = f"""
        SELECT 
          '{company['company_project_id']}' AS company_project_id,
          {company['company_id']} AS company_id,
          SAFE_CAST(id AS STRING) AS id,
          SAFE_CAST(appointment_type_id AS STRING) AS appointment_type_id,
          SAFE_CAST(technician_id AS STRING) AS technician_id,
          SAFE_CAST(job_id AS STRING) AS job_id,
          SAFE_CAST(customer_id AS STRING) AS customer_id,
          SAFE_CAST(created_on AS TIMESTAMP) AS created_on,
          SAFE_CAST(updated_on AS TIMESTAMP) AS updated_on,
          SAFE_CAST(start_time AS TIMESTAMP) AS start_time,
          SAFE_CAST(end_time AS TIMESTAMP) AS end_time,
          SAFE_CAST(status AS STRING) AS status,
          SAFE_CAST(notes AS STRING) AS notes,
          SAFE_CAST(address AS STRING) AS address,
          SAFE_CAST(city AS STRING) AS city,
          SAFE_CAST(state AS STRING) AS state,
          SAFE_CAST(zip AS STRING) AS zip,
          SAFE_CAST(phone AS STRING) AS phone,
          SAFE_CAST(email AS STRING) AS email,
          SAFE_CAST(confirmed AS BOOLEAN) AS confirmed,
          SAFE_CAST(completed AS BOOLEAN) AS completed,
          SAFE_CAST(cancelled AS BOOLEAN) AS cancelled,
          SAFE_CAST(no_show AS BOOLEAN) AS no_show,
          SAFE_CAST(rescheduled AS BOOLEAN) AS rescheduled,
          SAFE_CAST(arrival_window_start AS TIMESTAMP) AS arrival_window_start,
          SAFE_CAST(arrival_window_end AS TIMESTAMP) AS arrival_window_end
        FROM `{company['company_project_id']}.{DATASET_SILVER}.vw_{table_name}`"""
        union_parts.append(union_part)
    
    # Configurar clusterizado
    cluster_sql = f"CLUSTER BY {', '.join(cluster_fields)}" if cluster_fields else ""
    
    # SQL completo con normalización forzada
    create_sql = f"""
    CREATE OR REPLACE TABLE `{PROJECT_CENTRAL}.{DATASET_BRONZE}.consolidated_{table_name}`
    PARTITION BY DATE({partition_field})
    {cluster_sql}
    AS
    {' UNION ALL '.join(union_parts)}
    """
    
    try:
        print("🔄 Ejecutando creación con normalización forzada...")
        query_job = client.query(create_sql)
        query_job.result()
        print(f"✅ Tabla creada exitosamente: {PROJECT_CENTRAL}.{DATASET_BRONZE}.consolidated_{table_name}")
        return True
    except Exception as e:
        print(f"❌ Error creando tabla: {str(e)}")
        return False

# 🚨 PRUEBA DE SOLUCIÓN DE EMERGENCIA
print("🚨 APLICANDO SOLUCIÓN DE EMERGENCIA PARA appointment")
if not df_companies.empty:
    success = create_consolidated_table_with_normalization('appointment', df_companies)
    if success:
        print("🎉 ¡SOLUCIÓN APLICADA EXITOSAMENTE!")
    else:
        print("❌ Error aplicando solución de emergencia")
else:
    print("⚠️  No hay compañías disponibles para prueba")
