In [1]:
# === IMPORTS INTERNOS ===
from core.libs import pd, np
from core.db import get_engine

# === CONEXI√ìN ===
engine = get_engine()

üíª Conectado a la base de datos helloworldtree


In [2]:
"""
Verifica el estado actual de la migraci√≥n en la base de datos
"""

from typing import Dict, List, Tuple

# === IMPORTS INTERNOS ===
from core.libs import pd, np
from core.db import get_engine

# === CONEXI√ìN ===
engine = get_engine()

def check_table_existence() -> Dict[str, Tuple[bool, str]]:
    """Verifica qu√© tablas/views existen"""

    query = """
        SELECT table_name, table_type
        FROM information_schema.tables
        WHERE table_schema = 'public'
        AND table_name IN ('survival_timeseries', 'survival_current',
                           'inventory_metrics', 'inventory_metrics_current')
    """

    df = pd.read_sql(query, engine)

    df = pd.read_sql(query, engine)

    objects_to_check = [
        'survival_timeseries',
        'survival_current',
        'inventory_metrics',
        'inventory_metrics_current'
    ]

    results = {}

    for obj_name in objects_to_check:
        row = df[df['table_name'] == obj_name]
        if not row.empty:
            obj_type = 'VIEW' if row.iloc[0]['table_type'] == 'VIEW' else 'TABLE'
            results[obj_name] = (True, obj_type)
        else:
            results[obj_name] = (False, 'NONE')

    return results


def check_indexes_on_deprecated() -> List[Dict]:
    """Busca √≠ndices en tablas/views deprecadas"""

    query = """
        SELECT
            schemaname,
            tablename,
            indexname,
            indexdef
        FROM pg_indexes
        WHERE schemaname = 'public'
        AND tablename IN ('survival_timeseries', 'survival_current')
        ORDER BY tablename, indexname
    """

    df = pd.read_sql(query, engine)

    return df.to_dict('records') if not df.empty else []


def check_constraints_on_deprecated() -> List[Dict]:
    """Busca constraints en tablas deprecadas"""

    query = """
        SELECT
            tc.table_name,
            tc.constraint_name,
            tc.constraint_type,
            kcu.column_name
        FROM information_schema.table_constraints tc
        LEFT JOIN information_schema.key_column_usage kcu
            ON tc.constraint_name = kcu.constraint_name
            AND tc.table_schema = kcu.table_schema
        WHERE tc.table_schema = 'public'
        AND tc.table_name IN ('survival_timeseries', 'survival_current')
        ORDER BY tc.table_name, tc.constraint_type, tc.constraint_name
    """

    df = pd.read_sql(query, engine)

    return df.to_dict('records') if not df.empty else []


def check_foreign_keys_referencing_deprecated() -> List[Dict]:
    """Busca foreign keys que referencian tablas deprecadas"""

    query = """
        SELECT
            tc.table_name AS referencing_table,
            kcu.column_name AS referencing_column,
            ccu.table_name AS referenced_table,
            ccu.column_name AS referenced_column,
            tc.constraint_name
        FROM information_schema.table_constraints tc
        JOIN information_schema.key_column_usage kcu
            ON tc.constraint_name = kcu.constraint_name
            AND tc.table_schema = kcu.table_schema
        JOIN information_schema.constraint_column_usage ccu
            ON ccu.constraint_name = tc.constraint_name
            AND ccu.table_schema = tc.table_schema
        WHERE tc.constraint_type = 'FOREIGN KEY'
        AND tc.table_schema = 'public'
        AND ccu.table_name IN ('survival_timeseries', 'survival_current')
        ORDER BY tc.table_name
    """

    df = pd.read_sql(query, engine)

    return df.to_dict('records') if not df.empty else []


def check_view_definitions() -> Dict[str, str]:
    """Obtiene definiciones de views relevantes"""

    query = """
        SELECT
            table_name,
            view_definition
        FROM information_schema.views
        WHERE table_schema = 'public'
        AND table_name IN ('survival_current', 'inventory_metrics_current')
    """

    df = pd.read_sql(query, engine)

    return dict(zip(df['table_name'], df['view_definition'])) if not df.empty else {}


def print_db_state_report(conn):
    """Imprime reporte completo del estado de la BD"""

    print("=" * 80)
    print("üóÑÔ∏è  ESTADO ACTUAL DE LA BASE DE DATOS")
    print("=" * 80)
    print()

    # 1. Existencia de objetos
    print("üìä OBJETOS EN LA BASE DE DATOS")
    print("-" * 80)
    existence = check_table_existence(conn)
    for obj_name, (exists, obj_type) in existence.items():
        if exists:
            icon = "‚úÖ" if obj_name.startswith('inventory_') else "‚ö†Ô∏è"
            print(f"{icon} {obj_name:30} ‚Üí {obj_type}")
        else:
            icon = "‚úÖ" if obj_name.startswith('survival_') else "‚ùå"
            print(f"{icon} {obj_name:30} ‚Üí NO EXISTE")
    print()

    # 2. √çndices problem√°ticos
    print("üîç √çNDICES EN TABLAS DEPRECADAS")
    print("-" * 80)
    indexes = check_indexes_on_deprecated(conn)
    if indexes:
        print("‚ö†Ô∏è  ENCONTRADOS (deben eliminarse si la tabla es VIEW):")
        for idx in indexes:
            print(f"  ‚õî {idx['table']}.{idx['index']}")
            print(f"     {idx['definition'][:80]}...")
    else:
        print("‚úÖ No hay √≠ndices en tablas deprecadas")
    print()

    # 3. Constraints problem√°ticos
    print("üîó CONSTRAINTS EN TABLAS DEPRECADAS")
    print("-" * 80)
    constraints = check_constraints_on_deprecated(conn)
    if constraints:
        print("‚ö†Ô∏è  ENCONTRADOS (deben migrarse):")
        for const in constraints:
            print(f"  ‚õî {const['table']}.{const['constraint']} ({const['type']})")
            if const['column']:
                print(f"     Columna: {const['column']}")
    else:
        print("‚úÖ No hay constraints en tablas deprecadas")
    print()

    # 4. Foreign keys apuntando a deprecadas
    print("üîó FOREIGN KEYS REFERENCIANDO TABLAS DEPRECADAS")
    print("-" * 80)
    fkeys = check_foreign_keys_referencing_deprecated(conn)
    if fkeys:
        print("‚ö†Ô∏è  ENCONTRADAS (deben actualizarse):")
        for fk in fkeys:
            print(f"  ‚õî {fk['from_table']}.{fk['from_column']}")
            print(f"     ‚Üí {fk['to_table']}.{fk['to_column']}")
            print(f"     Constraint: {fk['constraint']}")
    else:
        print("‚úÖ No hay FKs referenciando tablas deprecadas")
    print()

    # 5. Definiciones de views
    print("üëÅÔ∏è  DEFINICIONES DE VIEWS")
    print("-" * 80)
    views = check_view_definitions(conn)
    if views:
        for view_name, definition in views.items():
            print(f"üìå {view_name}:")
            print(f"   {definition[:200]}...")
            print()
    else:
        print("‚ö†Ô∏è  No se encontraron views relevantes")

    # 6. Resumen y acciones
    print("=" * 80)
    print("üìã ACCIONES REQUERIDAS")
    print("=" * 80)

    actions = []

    # Verificar cada problema
    if existence.get('survival_timeseries', (False, ''))[0]:
        actions.append("üîÑ MIGRAR survival_timeseries ‚Üí inventory_metrics")

    if existence.get('survival_current', (False, ''))[1] == 'TABLE':
        actions.append("üîÑ CONVERTIR survival_current de TABLE a VIEW")

    if indexes:
        actions.append("‚ùå ELIMINAR √≠ndices en survival_current (si es VIEW)")

    if constraints:
        actions.append("üîÑ MIGRAR constraints a nuevas tablas")

    if fkeys:
        actions.append("üîÑ ACTUALIZAR foreign keys a nuevas tablas")

    if not actions:
        print("‚úÖ ¬°Base de datos est√° correctamente migrada!")
    else:
        for i, action in enumerate(actions, 1):
            print(f"{i}. {action}")

    print()


def generate_migration_sql(conn):
    """Genera SQL para completar la migraci√≥n"""

    print("=" * 80)
    print("üõ†Ô∏è  SQL DE MIGRACI√ìN SUGERIDO")
    print("=" * 80)
    print()

    existence = check_table_existence(conn)
    indexes = check_indexes_on_deprecated(conn)
    constraints = check_constraints_on_deprecated(conn)
    fkeys = check_foreign_keys_referencing_deprecated(conn)

    sql_statements = []

    # 1. Eliminar √≠ndices en views
    if indexes:
        sql_statements.append("-- 1. Eliminar √≠ndices en tablas deprecadas (si son VIEWs)")
        for idx in indexes:
            if existence.get(idx['table'], (False, ''))[1] == 'VIEW':
                sql_statements.append(f"DROP INDEX IF EXISTS {idx['index']};")
        sql_statements.append("")

    # 2. Migrar constraints
    if constraints:
        sql_statements.append("-- 2. Migrar constraints (REQUIERE REVISI√ìN MANUAL)")
        for const in constraints:
            old_table = const['table']
            new_table = old_table.replace('survival_', 'inventory_metrics')
            sql_statements.append(f"-- TODO: Migrar {const['constraint']} de {old_table} a {new_table}")
        sql_statements.append("")

    # 3. Actualizar foreign keys
    if fkeys:
        sql_statements.append("-- 3. Actualizar foreign keys")
        for fk in fkeys:
            new_ref_table = fk['to_table'].replace('survival_', 'inventory_metrics')
            sql_statements.append(f"-- Actualizar FK en {fk['from_table']}")
            sql_statements.append(f"ALTER TABLE {fk['from_table']}")
            sql_statements.append(f"  DROP CONSTRAINT IF EXISTS {fk['constraint']};")
            sql_statements.append(f"ALTER TABLE {fk['from_table']}")
            sql_statements.append(f"  ADD CONSTRAINT {fk['constraint']}")
            sql_statements.append(f"  FOREIGN KEY ({fk['from_column']})")
            sql_statements.append(f"  REFERENCES {new_ref_table}({fk['to_column']});")
            sql_statements.append("")

    # 4. Recrear survival_current como VIEW si es tabla
    if existence.get('survival_current', (False, ''))[1] == 'TABLE':
        sql_statements.append("-- 4. Convertir survival_current a VIEW")
        sql_statements.append("DROP TABLE IF EXISTS survival_current CASCADE;")
        sql_statements.append("""
CREATE OR REPLACE VIEW survival_current AS
SELECT * FROM inventory_metrics_current;
""")

    if sql_statements:
        print("\n".join(sql_statements))
        print()
        print("‚ö†Ô∏è  ADVERTENCIA: Revisa y ajusta este SQL antes de ejecutarlo")
    else:
        print("‚úÖ No se requiere SQL de migraci√≥n")

    print()


if __name__ == "__main__":
    import sys

    # Configuraci√≥n de conexi√≥n (ajustar seg√∫n sea necesario)
    DB_CONFIG = {
        'dbname': 'tu_database',
        'user': 'tu_usuario',
        'password': 'tu_password',
        'host': 'localhost',
        'port': 5432
    }

    try:
        print("üîå Conectando a la base de datos...")
        conn = psycopg2.connect(**DB_CONFIG)
        print("‚úÖ Conexi√≥n exitosa")
        print()

        # Ejecutar diagn√≥stico
        print_db_state_report(conn)
        generate_migration_sql(conn)

        conn.close()

    except psycopg2.Error as e:
        print(f"‚ùå Error de base de datos: {e}")
        sys.exit(1)
    except Exception as e:
        print(f"‚ùå Error: {e}")
        sys.exit(1)

üíª Conectado a la base de datos helloworldtree
üîå Conectando a la base de datos...


NameError: name 'psycopg2' is not defined