# An√°lisis Base de Conocimiento - Subrespuesta y Segmento

Este notebook procesa archivos de base de conocimiento:
- `_tbl_subrespuesta__PRD_baseconocimientosdb_202511201112.json`
- `_tbl_segmento__PRD_baseconocimientosdb_202511201112.json`

Objetivos:
1. Leer archivos JSON del ZIP
2. Limpiar HTML de columnas 'texto'
3. Cruzar por `id_pregunta` y consolidar textos
4. Analizar campo `activo`

In [None]:
import pandas as pd
import re
import json
import zipfile
from bs4 import BeautifulSoup

## 1. Cargar archivos JSON desde ZIP

In [None]:
def cargar_json_desde_zip(zip_path, archivos_objetivo):
    """
    Carga archivos JSON espec√≠ficos desde un ZIP.
    
    Args:
        zip_path (str): Ruta al archivo ZIP
        archivos_objetivo (list): Lista de nombres de archivos JSON a buscar
    
    Returns:
        dict: Diccionario con {nombre_archivo: DataFrame}
    """
    dataframes = {}
    
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        archivos_disponibles = zip_ref.namelist()
        print(f"üì¶ Archivos en ZIP: {len(archivos_disponibles)}")
        for archivo in archivos_disponibles:
            print(f"   - {archivo}")
        
        for archivo_objetivo in archivos_objetivo:
            # Buscar archivo que contenga el nombre objetivo
            archivo_encontrado = None
            for archivo in archivos_disponibles:
                if archivo_objetivo in archivo and archivo.endswith('.json'):
                    archivo_encontrado = archivo
                    break
            
            if not archivo_encontrado:
                print(f"\n‚ö†Ô∏è  No encontrado: {archivo_objetivo}")
                dataframes[archivo_objetivo] = pd.DataFrame()
                continue
            
            print(f"\nüìÑ Procesando: '{archivo_encontrado}'...")
            
            try:
                with zip_ref.open(archivo_encontrado, 'r') as f:
                    data = json.load(f)
                    
                    # Estructura: {"SELECT * FROM tabla": [{registro1}, {registro2}, ...]}
                    if isinstance(data, dict):
                        records = list(data.values())[0]
                        df = pd.DataFrame(records)
                        
                        # Reemplazar null por pd.NA
                        df = df.replace({None: pd.NA, 'null': pd.NA, 'NULL': pd.NA})
                        
                        print(f"  ‚úì {len(df):,} registros cargados")
                        print(f"  ‚úì Columnas: {list(df.columns)}")
                        
                        # Informaci√≥n sobre 'texto' y 'activo'
                        if 'texto' in df.columns:
                            texto_no_vacio = df['texto'].notna().sum()
                            print(f"  ‚úì Registros con texto: {texto_no_vacio:,} ({texto_no_vacio/len(df)*100:.1f}%)")
                        
                        if 'activo' in df.columns:
                            print(f"  ‚úì Distribuci√≥n activo:")
                            print(df['activo'].value_counts().to_string().replace('\n', '\n    '))
                        
                        dataframes[archivo_objetivo] = df
                    else:
                        print(f"  ‚ö†Ô∏è  Formato inesperado: {type(data)}")
                        dataframes[archivo_objetivo] = pd.DataFrame()
            
            except Exception as e:
                print(f"  ‚ùå Error: {e}")
                dataframes[archivo_objetivo] = pd.DataFrame()
    
    return dataframes

In [None]:
# Cargar archivos
ZIP_PATH = '../_tbl_subrespuesta__PRD_baseconocimientosdb_202511201112.zip'

ARCHIVOS_OBJETIVO = [
    '_tbl_segmento',
    '_tbl_subrespuesta'
]

dfs = cargar_json_desde_zip(ZIP_PATH, ARCHIVOS_OBJETIVO)

df_segmento = dfs['_tbl_segmento']
df_subrespuesta = dfs['_tbl_subrespuesta']

print(f"\n{'='*80}")
print(f"RESUMEN DE CARGA")
print(f"{'='*80}")
print(f"df_segmento: {len(df_segmento):,} filas")
print(f"df_subrespuesta: {len(df_subrespuesta):,} filas")

## 2. Funci√≥n de limpieza de HTML

### An√°lisis de la funci√≥n `clean_html_text`:

**Puntos fuertes:**
- ‚úÖ Usa BeautifulSoup para parsear HTML correctamente
- ‚úÖ Preserva saltos de l√≠nea (`<br>`)
- ‚úÖ Extrae URLs de m√∫ltiples tags
- ‚úÖ Maneja casos donde el input no es string

**Puntos de mejora identificados:**
1. ‚ùå **C√≥digo repetitivo**: La l√≥gica de extracci√≥n de URLs se repite para cada tag
2. ‚ùå **Try-except silencioso**: Captura excepciones sin logging, dificulta debug
3. ‚ùå **L√≠neas comentadas**: C√≥digo muerto (l√≠neas 48-49) deber√≠a removerse
4. ‚ùå **Par√°metros hardcodeados**: Separador `\n\n` no es configurable
5. ‚ùå **No maneja listas/tablas HTML**: `<ul>`, `<ol>`, `<table>` no tienen formato especial
6. ‚ö†Ô∏è  **Eficiencia**: Itera m√∫ltiples veces sobre tags en lugar de una sola pasada

### Versi√≥n mejorada:

In [None]:
def clean_html_text(html_text, preserve_links=True, line_separator='\n\n'):
    """
    Limpia texto con etiquetas HTML manteniendo estructura legible.
    
    Mejoras sobre versi√≥n original:
    - Par√°metros configurables (preserve_links, line_separator)
    - Mejor manejo de listas (<ul>, <ol>, <li>)
    - Logging de errores en lugar de try-except silencioso
    - C√≥digo m√°s limpio y DRY
    
    Args:
        html_text (str): Texto con etiquetas HTML
        preserve_links (bool): Si True, preserva URLs entre par√©ntesis
        line_separator (str): Separador para saltos de l√≠nea (default: '\n\n')
    
    Returns:
        str: Texto limpio
    """
    # Verificar tipo
    if not isinstance(html_text, str):
        return html_text
    
    if not html_text.strip():
        return html_text
    
    try:
        # Parsear HTML
        soup = BeautifulSoup(html_text, "html.parser")
        
        # Reemplazar <br> con salto de l√≠nea
        for br in soup.find_all("br"):
            br.replace_with(line_separator)
        
        # Agregar bullets a items de lista
        for li in soup.find_all("li"):
            li.insert(0, "‚Ä¢ ")
            li.append(line_separator)
        
        # Agregar saltos despu√©s de p√°rrafos
        for p in soup.find_all("p"):
            p.append(line_separator)
        
        # Procesar enlaces si preserve_links=True
        if preserve_links:
            # Tags y sus atributos que contienen URLs
            url_tags = {
                'a': 'href',
                'img': 'src',
                'script': 'src',
                'audio': 'src',
                'video': 'src',
                'iframe': 'src',
                'link': 'href',
                'area': 'href'
            }
            
            for tag_name, attr in url_tags.items():
                for tag in soup.find_all(tag_name):
                    url = tag.get(attr, '')
                    if url and ('http://' in url or 'https://' in url):
                        # Para enlaces <a>, preservar texto + URL
                        if tag_name == 'a':
                            text = tag.get_text().strip()
                            if text:
                                tag.replace_with(f"{text} ({url})")
                            else:
                                tag.replace_with(f"({url})")
                        else:
                            # Para otros tags, solo URL
                            tag.replace_with(f" ({url}) ")
        
        # Obtener texto limpio
        text = soup.get_text()
        
        # Limpiar espacios m√∫ltiples en cada l√≠nea
        lines = text.split('\n')
        clean_lines = [re.sub(r'\s+', ' ', line.strip()) for line in lines]
        
        # Remover l√≠neas vac√≠as y unir
        clean_text = line_separator.join(filter(None, clean_lines))
        
        # Limpiar entidades HTML restantes
        clean_text = clean_text.replace('&nbsp;', ' ')
        clean_text = clean_text.replace('&amp;', '&')
        clean_text = clean_text.replace('&lt;', '<')
        clean_text = clean_text.replace('&gt;', '>')
        
        return clean_text.strip()
    
    except Exception as e:
        # En producci√≥n, usar logging en lugar de print
        print(f"‚ö†Ô∏è  Error limpiando HTML: {e}")
        print(f"   Texto problem√°tico (primeros 100 chars): {str(html_text)[:100]}")
        return html_text

## 3. Aplicar limpieza a columnas 'texto'

In [None]:
print("="*80)
print("APLICANDO LIMPIEZA DE HTML")
print("="*80)

# Aplicar a df_segmento
print("\nüìÑ Limpiando df_segmento...")
if 'texto' in df_segmento.columns:
    df_segmento['texto_limpio'] = df_segmento['texto'].apply(clean_html_text)
    print(f"  ‚úì {len(df_segmento):,} registros procesados")
    
    # Comparar antes/despu√©s
    sample_idx = df_segmento[df_segmento['texto'].notna()].index[0]
    print(f"\n  üìã Ejemplo de limpieza (registro {sample_idx}):")
    print(f"  ANTES: {df_segmento.loc[sample_idx, 'texto'][:200]}...")
    print(f"  DESPU√âS: {df_segmento.loc[sample_idx, 'texto_limpio'][:200]}...")
else:
    print("  ‚ö†Ô∏è  No hay columna 'texto' en df_segmento")

# Aplicar a df_subrespuesta
print("\nüìÑ Limpiando df_subrespuesta...")
if 'texto' in df_subrespuesta.columns:
    df_subrespuesta['texto_limpio'] = df_subrespuesta['texto'].apply(clean_html_text)
    print(f"  ‚úì {len(df_subrespuesta):,} registros procesados")
    
    # Comparar antes/despu√©s
    sample_idx = df_subrespuesta[df_subrespuesta['texto'].notna()].index[0]
    if pd.notna(df_subrespuesta.loc[sample_idx, 'texto']):
        print(f"\n  üìã Ejemplo de limpieza (registro {sample_idx}):")
        print(f"  ANTES: {df_subrespuesta.loc[sample_idx, 'texto'][:200]}...")
        print(f"  DESPU√âS: {df_subrespuesta.loc[sample_idx, 'texto_limpio'][:200]}...")
else:
    print("  ‚ö†Ô∏è  No hay columna 'texto' en df_subrespuesta")

## 4. Cruzar bases por id_pregunta y consolidar textos

Estrategia:
1. Hacer **outer join** por `id_pregunta` para capturar registros de ambas bases
2. Consolidar todos los textos de cada `id_pregunta` en un solo campo
3. Analizar campo `activo` en los cruces

In [None]:
print("="*80)
print("CRUCE DE BASES POR id_pregunta")
print("="*80)

# Preparar dataframes para el merge
# Agregar sufijos para identificar origen
df_merged = pd.merge(
    df_segmento,
    df_subrespuesta,
    on='id_pregunta',
    how='outer',
    suffixes=('_segmento', '_subrespuesta')
)

print(f"\nüìä Resultados del merge:")
print(f"  Total registros despu√©s del merge: {len(df_merged):,}")
print(f"  id_pregunta √∫nicos: {df_merged['id_pregunta'].nunique():,}")

# Analizar origen de registros
solo_segmento = df_merged['idtbl_segmento'].notna() & df_merged['idtbl_subrespuesta'].isna()
solo_subrespuesta = df_merged['idtbl_segmento'].isna() & df_merged['idtbl_subrespuesta'].notna()
en_ambas = df_merged['idtbl_segmento'].notna() & df_merged['idtbl_subrespuesta'].notna()

print(f"\nüìã Distribuci√≥n por origen:")
print(f"  Solo en df_segmento: {solo_segmento.sum():,} registros")
print(f"  Solo en df_subrespuesta: {solo_subrespuesta.sum():,} registros")
print(f"  En ambas bases: {en_ambas.sum():,} registros")

# Consolidar textos por id_pregunta
print(f"\nüîÑ Consolidando textos por id_pregunta...")

def consolidar_textos(row):
    """
    Consolida textos de segmento y subrespuesta en un solo texto.
    """
    textos = []
    
    # Texto de segmento
    if pd.notna(row.get('texto_limpio_segmento')):
        texto_seg = str(row['texto_limpio_segmento']).strip()
        if texto_seg:
            textos.append(f"[SEGMENTO] {texto_seg}")
    
    # Texto de subrespuesta
    if pd.notna(row.get('texto_limpio_subrespuesta')):
        texto_sub = str(row['texto_limpio_subrespuesta']).strip()
        if texto_sub:
            textos.append(f"[SUBRESPUESTA] {texto_sub}")
    
    return '\n\n'.join(textos) if textos else pd.NA

df_merged['texto_consolidado'] = df_merged.apply(consolidar_textos, axis=1)

# Estad√≠sticas de consolidaci√≥n
registros_con_texto = df_merged['texto_consolidado'].notna().sum()
print(f"  ‚úì Registros con texto consolidado: {registros_con_texto:,} ({registros_con_texto/len(df_merged)*100:.1f}%)")

# Mostrar ejemplo
ejemplo = df_merged[df_merged['texto_consolidado'].notna()].iloc[0]
print(f"\n  üìã Ejemplo de texto consolidado (id_pregunta={ejemplo['id_pregunta']}):")
print(f"  {ejemplo['texto_consolidado'][:500]}...")

## 5. An√°lisis del campo 'activo'

In [None]:
print("="*80)
print("AN√ÅLISIS DEL CAMPO 'activo'")
print("="*80)

# An√°lisis general
print("\nüìä Distribuci√≥n de 'activo' en cada base:")
print("\ndf_segmento:")
if 'activo_segmento' in df_merged.columns:
    print(df_merged['activo_segmento'].value_counts(dropna=False).to_string())

print("\ndf_subrespuesta:")
if 'activo_subrespuesta' in df_merged.columns:
    print(df_merged['activo_subrespuesta'].value_counts(dropna=False).to_string())

# An√°lisis de cruces
print(f"\n{'='*80}")
print("AN√ÅLISIS DE CRUCES (registros presentes en AMBAS bases)")
print(f"{'='*80}")

df_cruces = df_merged[en_ambas].copy()
print(f"\nTotal de cruces: {len(df_cruces):,} registros")

if len(df_cruces) > 0:
    # Crear categor√≠as de an√°lisis
    df_cruces['categoria_activo'] = 'Otro'
    
    # Ambos activo=1
    ambos_activos = (df_cruces['activo_segmento'] == 1) & (df_cruces['activo_subrespuesta'] == 1)
    df_cruces.loc[ambos_activos, 'categoria_activo'] = 'Ambos activo=1'
    
    # Ambos activo=0
    ambos_inactivos = (df_cruces['activo_segmento'] == 0) & (df_cruces['activo_subrespuesta'] == 0)
    df_cruces.loc[ambos_inactivos, 'categoria_activo'] = 'Ambos activo=0'
    
    # Solo segmento activo
    solo_seg_activo = (df_cruces['activo_segmento'] == 1) & (df_cruces['activo_subrespuesta'] == 0)
    df_cruces.loc[solo_seg_activo, 'categoria_activo'] = 'Solo segmento activo=1'
    
    # Solo subrespuesta activo
    solo_sub_activo = (df_cruces['activo_segmento'] == 0) & (df_cruces['activo_subrespuesta'] == 1)
    df_cruces.loc[solo_sub_activo, 'categoria_activo'] = 'Solo subrespuesta activo=1'
    
    # Resumen
    print("\nüìã Distribuci√≥n de estados 'activo' en cruces:")
    resumen = df_cruces['categoria_activo'].value_counts()
    for categoria, count in resumen.items():
        porcentaje = count / len(df_cruces) * 100
        print(f"  {categoria}: {count:,} ({porcentaje:.1f}%)")
    
    # Insight clave
    print(f"\n{'='*80}")
    print("INSIGHTS CLAVE")
    print(f"{'='*80}")
    
    if ambos_activos.sum() == len(df_cruces):
        print("‚úÖ Todos los cruces tienen activo=1 en ambas bases (consistencia perfecta)")
    elif ambos_inactivos.sum() == len(df_cruces):
        print("‚ö†Ô∏è  Todos los cruces tienen activo=0 en ambas bases")
    else:
        inconsistencias = solo_seg_activo.sum() + solo_sub_activo.sum()
        porc_incons = inconsistencias / len(df_cruces) * 100
        print(f"‚ö†Ô∏è  Hay {inconsistencias:,} cruces ({porc_incons:.1f}%) con valores diferentes de 'activo'")
        print(f"    Esto podr√≠a indicar desincronizaci√≥n entre las bases")
    
    # Tabla resumen
    print("\nüìä Tabla de contingencia (segmento x subrespuesta):")
    contingencia = pd.crosstab(
        df_cruces['activo_segmento'],
        df_cruces['activo_subrespuesta'],
        margins=True,
        margins_name='TOTAL'
    )
    contingencia.index.name = 'activo_segmento'
    contingencia.columns.name = 'activo_subrespuesta'
    print(contingencia)
else:
    print("\n‚ö†Ô∏è  No hay registros que crucen en ambas bases")

## 6. Exportar resultados consolidados

In [None]:
# Crear DataFrame final consolidado por id_pregunta
df_consolidado_final = df_merged.groupby('id_pregunta').agg({
    'texto_consolidado': lambda x: '\n\n---\n\n'.join(x.dropna()),
    'activo_segmento': lambda x: x.mode()[0] if len(x.mode()) > 0 else pd.NA,
    'activo_subrespuesta': lambda x: x.mode()[0] if len(x.mode()) > 0 else pd.NA,
    'titulo_segmento': 'first',
    'titulo_subrespuesta': 'first'
}).reset_index()

print(f"\nüì¶ DataFrame consolidado final:")
print(f"  {len(df_consolidado_final):,} id_pregunta √∫nicos")
print(f"\nPrimeras 5 filas:")
print(df_consolidado_final.head())

# Opcional: Exportar a CSV
# df_consolidado_final.to_csv('../Salidas/base_conocimiento_consolidada.csv', index=False)
# print("\n‚úÖ Exportado a: ../Salidas/base_conocimiento_consolidada.csv")