# Importación de Datos Censales a Elasticsearch

Este notebook explica el proceso de importación de datos censales del INEGI a Elasticsearch utilizando Python. El código está diseñado para procesar múltiples archivos CSV con información geográfica y demográfica, definir esquemas de datos (mappings) apropiados y cargar eficientemente los datos en Elasticsearch.

## Estructura de los Datos

Los datos censales están organizados en varias tablas:

- **cat_distrito_2020**: Catálogo de distritos electorales
- **cat_seccion_2020**: Catálogo de secciones electorales
- **ine_distrito_2020**: Datos demográficos y socioeconómicos a nivel distrito
- **ine_entidad_2020**: Datos demográficos y socioeconómicos a nivel entidad federativa
- **ine_seccion_2020**: Datos demográficos y socioeconómicos a nivel sección electoral

Estos datos provienen del Censo de Población y Vivienda 2020, procesados para su uso electoral.


## 1. Configuración Inicial

Comenzamos importando las bibliotecas necesarias y configurando el sistema de logging:

In [None]:
import pandas as pd
from elasticsearch import Elasticsearch, helpers
import os
import logging
import time
from typing import Dict, List, Optional, Tuple

# Configuración de logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("elastic_import.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

### Explicación:

- **pandas**: Para leer y procesar archivos CSV
- **elasticsearch**: Cliente Python para interactuar con Elasticsearch
- **helpers.bulk**: Permite realizar inserciones masivas (bulk) más eficientes
- **logging**: Configuramos un sistema de logs para registrar todos los eventos y mensajes importantes
- **typing**: Para usar anotaciones de tipo que hacen más legible el código

## 2. Conexión a Elasticsearch

Función para establecer conexión con el servidor de Elasticsearch:

In [None]:
def connect_elasticsearch():
    try:
        es = Elasticsearch("http://localhost:9200")
        
        if es.ping():
            logger.info("Conexión a Elasticsearch establecida")
            return es
        else:
            logger.error("No se pudo establecer conexión con Elasticsearch")
            return None
    except Exception as e:
        logger.error(f"Error al conectar con Elasticsearch: {str(e)}")
        return None

### Explicación:

- Creamos una instancia del cliente Elasticsearch conectándonos al servidor local (por defecto en el puerto 9200)
- Usamos el método ping() para verificar que la conexión está activa
- La función devuelve el cliente si la conexión es exitosa, o None si hay algún problema

## 3. Definición de Mappings para Elasticsearch

Los mappings en Elasticsearch definen cómo se almacenan e indexan los documentos y sus campos. Esta es la estructura del mapping:

In [None]:
mapping_basico = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 1,
        "refresh_interval": "5s",
        "analysis": {
            "analyzer": {
                "spanish_analyzer": {"type": "spanish"}
            }
        }
    },
    "mappings": {
        "properties": {
            "id_field": {"type": "keyword"},
            "text_field": {
                "type": "text",
                "analyzer": "spanish_analyzer",
                "fields": {
                    "keyword": {"type": "keyword"}
                }
            },
            "numeric_field": {"type": "integer"},
            "float_field": {"type": "float"}
        }
    }
}

### Componentes principales del mapping:

1. **Settings**:
   - number_of_shards: Controla cómo se distribuyen los datos (1 para índices pequeños)
   - number_of_replicas: Controla la redundancia (1 significa una copia adicional)
   - refresh_interval: Tiempo entre actualizaciones del índice
   - analysis: Configuración de tokenizadores y analizadores

2. **Mappings** (Propiedades de campos):
   - keyword: Para valores exactos (IDs, códigos, categorías) - no tokenizado
   - text: Para texto completo que necesita ser buscable - tokenizado
   - integer, float: Para valores numéricos
   - date: Para fechas
   - Campos con múltiples representaciones (text + keyword)

3. **Analizadores**:
   - spanish_analyzer: Analizador específico para español que maneja correctamente acentos, plurales, etc.

### Análisis del Mapping Específico

Ahora examinemos una parte del mapping real utilizado en el código:

In [None]:
cat_seccion_mapping_example = {
    "mappings": {
        "properties": {
            "CVE_ENT": {"type": "keyword"}, # Clave de entidad (no necesita búsqueda por token)
            "CVE_DISTRITO": {"type": "keyword"}, # Clave de distrito (búsqueda exacta)
            "CVE_MUN": {"type": "keyword"}, # Clave de municipio (búsqueda exacta)
            "DESC_MUN": { # Nombre del municipio (necesita búsqueda por texto)
                "type": "text", 
                "analyzer": "spanish_analyzer", # Usar analizador español para manejar acentos
                "fields": {"keyword": {"type": "keyword"}} # También permitir búsqueda exacta
            },
            "CVE_SECCION": {"type": "keyword"}, # Clave de sección (búsqueda exacta)
            "DESC_SECCION": { # Descripción de sección
                "type": "text",
                "analyzer": "spanish_analyzer",
                "fields": {"keyword": {"type": "keyword"}}
            }
        }
    }
}

### Parte del mapping para datos demográficos (INE)

Los índices con datos censales tienen un mapping más complejo con muchos campos numéricos:

In [None]:
ine_seccion_mapping_example = {
    "mappings": {
        "properties": {
            "ID": {"type": "integer"},  # ID único de la sección
            "ENTIDAD": {"type": "integer"}, # ID de entidad
            "DISTRITO": {"type": "integer"}, # ID de distrito
            "MUNICIPIO": {"type": "integer"}, # ID de municipio
            "SECCION": {"type": "integer"}, # ID de sección electoral
            "TIPO": {"type": "integer"}, # Tipo de sección
            
            # Datos demográficos
            "POBTOT": {"type": "integer"}, # Población total
            "POBFEM": {"type": "integer"}, # Población femenina
            "POBMAS": {"type": "integer"}, # Población masculina
            
            # Datos por grupos de edad (ejemplo)
            "P_0A2": {"type": "integer"}, # Población de 0 a 2 años
            "P_0A2_F": {"type": "integer"}, # Población femenina de 0 a 2 años
            "P_0A2_M": {"type": "integer"}, # Población masculina de 0 a 2 años
            
            # Ratios y promedios
            "REL_H_M": {"type": "float"}, # Relación hombres-mujeres
            "GRAPROES": {"type": "float"}, # Grado promedio de escolaridad
            "PROM_OCUP": {"type": "float"} # Promedio de ocupantes por vivienda
        }
    }
}

### Puntos clave sobre el mapping:

1. **Campos de identificación**: Definidos como keyword para permitir búsquedas exactas y filtros

2. **Campos de texto descriptivo**: Definidos como text con el analizador en español para búsquedas de texto completo
   - También incluyen una versión keyword para agregaciones y filtros exactos

3. **Datos demográficos**: Principalmente integer para conteos de población

4. **Ratios y promedios**: Definidos como float para valores decimales

5. **Optimización**:
   - Se configura una réplica para redundancia
   - Se usa el analizador en español para mejor búsqueda en texto en español

## 4. Creación de Índices en Elasticsearch

Función para crear los índices con los mappings definidos:

In [None]:
def create_indices(es, mappings):
    created_indices = []
    failed_indices = []
    
    for index_name, mapping in mappings.items():
        try:
            if not es.indices.exists(index=index_name):
                es.indices.create(index=index_name, body=mapping)
                logger.info(f"Índice {index_name} creado con éxito")
                created_indices.append(index_name)
            else:
                logger.info(f"Índice {index_name} ya existe")
                created_indices.append(index_name)
        except Exception as e:
            logger.error(f"Error al crear índice {index_name}: {str(e)}")
            failed_indices.append(index_name)
    
    return created_indices, failed_indices

### Explicación:

- Repite a través de todos los mappings definidos
- Verifica si el índice ya existe para evitar errores
- Crea cada índice con su mapping específico
- Registra los índices creados con éxito y los que fallaron
- Devuelve dos listas: índices creados y fallidos

## 5. Procesamiento de Archivos CSV

Función para leer y procesar los archivos CSV antes de importarlos a Elasticsearch:

In [None]:
def process_csv_data(csv_path: str) -> Optional[pd.DataFrame]:
    try:
        logger.info(f"Leyendo archivo {csv_path}")
        df = pd.read_csv(csv_path, encoding='latin-1')
        
        df = df.where(pd.notnull(df), None)  # Convertir valores NaN a None para ES
        
        for col in df.columns:
            if df[col].dtype == 'float64' and df[col].isnull().any():
                df[col] = df[col].astype('float')
        
        logger.info(f"CSV leído correctamente con {len(df)} registros")
        return df
    except Exception as e:
        logger.error(f"Error procesando CSV {csv_path}: {str(e)}")
        return None

### Explicación:

1. **Lectura del CSV**:
   - Usa la codificación latin-1 que es adecuada para archivos con caracteres españoles

2. **Limpieza de datos**:
   - Convierte valores NaN a None (que es el valor nulo que entiende Elasticsearch)
   - Maneja correctamente los campos de tipo float que pueden tener valores nulos

3. **Manejo de errores**:
   - Captura cualquier error durante el procesamiento
   - Devuelve None si hay algún problema, lo que permite a la función principal manejar la situación

## 6. Importación de Datos a Elasticsearch

Función para importar datos del DataFrame a Elasticsearch usando la API Bulk:

In [None]:
def import_csv_to_elastic(
    es,
    df: pd.DataFrame,
    index_name: str,
    id_field: Optional[str] = None,
    batch_size: int = 5000
) -> Tuple[int, int]:
    
    success_count = 0
    error_count = 0
    
    try:
        total_records = len(df)
        logger.info(f"Preparando {total_records} documentos para indexar en {index_name}")
        
        # Procesar por lotes para optimizar rendimiento
        for i in range(0, total_records, batch_size):
            batch_df = df.iloc[i:i+batch_size]
            actions = []
            
            for _, row in batch_df.iterrows():
                doc = row.to_dict()
                action = {
                    "_index": index_name,
                    "_source": doc
                }
                
                # Usar un campo específico como ID si se indica
                if id_field and id_field in doc and doc[id_field] is not None:
                    action["_id"] = str(doc[id_field])
                    
                actions.append(action)
            
            # Ejecutar operación bulk
            if actions:
                success, failed = helpers.bulk(
                    es, 
                    actions, 
                    stats_only=True,
                    raise_on_error=False,
                    max_retries=3
                )
                success_count += success
                error_count += failed
                logger.info(f"Lote {i//batch_size + 1}: Indexados {success} documentos, fallidos: {failed}")
        
        logger.info(f"Importación a {index_name} completada: {success_count} éxitos, {error_count} errores")
        return success_count, error_count
        
    except Exception as e:
        logger.error(f"Error en importación a {index_name}: {str(e)}")
        return success_count, error_count + (total_records - success_count)

### Explicación:

1. **Procesamiento por lotes**:
   - Divide el DataFrame en lotes (batches) para procesar grandes volúmenes de datos eficientemente
   - El tamaño del lote predeterminado es 5000 documentos

2. **Preparación de documentos**:
   - Convierte cada fila del DataFrame a un diccionario
   - Define la acción para cada documento (index, id, etc.)
   - Asigna IDs específicos si se proporciona el campo id_field

3. **Operación Bulk**:
   - Utiliza helpers.bulk para enviar múltiples documentos en una sola operación
   - Parámetros:
     - stats_only=True: Solo devuelve conteos sin detalles de error
     - raise_on_error=False: No detiene la operación si algunos documentos fallan
     - max_retries=3: Reintenta hasta 3 veces en caso de fallos temporales

4. **Seguimiento de resultados**:
   - Cuenta documentos indexados correctamente y errores
   - Registra el progreso por lotes

## 7. Función Principal

La función principal (main) que orquesta todo el proceso:

In [None]:
def main():
    start_time = time.time()
    logger.info("Iniciando proceso de importación de datos censales")
    
    # Conectar a Elasticsearch
    es = connect_elasticsearch()
    if not es:
        logger.error("No se puede continuar sin conexión a Elasticsearch")
        return
    
    # Obtener mappings y crear índices
    mappings = get_mappings()
    created_indices, failed_indices = create_indices(es, mappings)
    if failed_indices:
        logger.warning(f"Algunos índices no pudieron crearse: {failed_indices}")
    
    # Configuración de tablas y archivos CSV
    tables_config = {
        "cat_distrito_2020": {
            "csv_file": "cat_distritos_2020.csv",
            "id_field": "CVE_DISTRITO"
        },
        "cat_seccion_2020": {
            "csv_file": "cat_secciones_2020.csv",
            "id_field": "CVE_SECCION"
        },
        "ine_distrito_2020": {
            "csv_file": "INE_DISTRITO_2020.CSV",
            "id_field": "DISTRITO"
        },
        "ine_entidad_2020": {
            "csv_file": "INE_ENTIDAD_2020.CSV",
            "id_field": "ENT"
        },
        "ine_seccion_2020": {
            "csv_file": "INE_SECCION_2020.csv",
            "id_field": "ID"
        }
    }
    
    # Directorio donde se encuentran los CSV
    csv_dir = "./eceg_2020_csv/"  # Ajustar según estructura
    
    # Variables para acumular resultados
    total_success = 0
    total_errors = 0
    processed_tables = []
    failed_tables = []
    
    # Procesar cada tabla
    for index_name, config in tables_config.items():
        if index_name not in created_indices:
            logger.warning(f"Omitiendo tabla {index_name} porque el índice no existe")
            continue
            
        csv_path = os.path.join(csv_dir, config["csv_file"])
        
        if not os.path.exists(csv_path):
            logger.error(f"No se encontró el archivo {csv_path}")
            failed_tables.append(index_name)
            continue
            
        # Procesar CSV
        df = process_csv_data(csv_path)
        if df is None:
            failed_tables.append(index_name)
            continue
            
        # Importar a Elasticsearch
        success, errors = import_csv_to_elastic(
            es, 
            df, 
            index_name,
            config.get("id_field"),
            batch_size=1000
         )

        # Actualizar contadores
        total_success += success
        total_errors += errors
        
        if errors == 0:
            processed_tables.append(index_name)
        else:
            failed_tables.append(f"{index_name} (parcial: {success}/{success+errors})")
    
    # Resumen final
    elapsed_time = time.time() - start_time
    logger.info("=" * 60)
    logger.info(f"Proceso completado en {elapsed_time:.2f} segundos")
    logger.info(f"Total documentos indexados: {total_success}")
    logger.info(f"Total errores: {total_errors}")
    logger.info(f"Tablas procesadas correctamente: {len(processed_tables)}")
    logger.info(f"Tablas con errores: {len(failed_tables)}")
    
    if processed_tables:
        logger.info(f"Tablas OK: {', '.join(processed_tables)}")
    if failed_tables:
        logger.warning(f"Tablas con errores: {', '.join(failed_tables)}")
    
    # Conteo final de documentos en cada índice
    for index_name in created_indices:
        try:
            count = es.count(index=index_name)["count"]
            logger.info(f"Índice {index_name}: {count} documentos")
        except Exception as e:
            logger.error(f"Error al contar documentos en {index_name}: {str(e)}")
    
    logger.info("=" * 60)

### Explicación del main():

1. **Inicialización**:
   - Registra el tiempo de inicio
   - Establece conexión con Elasticsearch

2. **Preparación de índices**:
   - Obtiene los mappings
   - Crea los índices necesarios

3. **Configuración**:
   - Define la configuración para cada tabla: archivo CSV y campo ID
   - Establece el directorio donde se encuentran los archivos

4. **Procesamiento de cada tabla**:
   - Verifica que el índice se haya creado correctamente
   - Comprueba que existe el archivo CSV
   - Lee y procesa el CSV
   - Importa los datos a Elasticsearch
   - Registra éxitos y errores

5. **Resumen y estadísticas**:
   - Tiempo total de ejecución
   - Documentos indexados correctamente y errores
   - Tablas procesadas con éxito y fallidas
   - Conteo final de documentos en cada índice

## 8. Ejecución del Programa

Finalmente, el punto de entrada del script:

In [None]:
if __name__ == "__main__":
    main()

## 9. Ejemplos de consultas

A continuación, se mostrarán ejemplos de consultas que pueden realizar:

In [None]:
from elasticsearch import Elasticsearch
import pandas as pd

# Conexión a Elasticsearch para hacer las consultas (ignoren esta parte y pasen a las colsultas 
# que estan debajo xdddd)
es = Elasticsearch("http://localhost:9200")

if es.ping():
    print("Conectado a Elasticsearch!")
else:
    print("No se pudo conectar a Elasticsearch.")

def display_results(response, agg_key=None):
    if agg_key:
        if 'aggregations' in response and agg_key in response['aggregations']:
            print(f"Resultados de la agregación '{agg_key}':")
            buckets = response['aggregations'][agg_key].get('buckets')
            if buckets:
                df = pd.DataFrame(buckets)
                print(df.to_string())
            else:
                print(response['aggregations'][agg_key])
        else:
            print(f"No se encontró la clave de agregación '{agg_key}' o no hay agregaciones.")
    elif 'hits' in response and 'hits' in response['hits']:
        hits = response['hits']['hits']
        if hits:
            df = pd.DataFrame([hit['_source'] for hit in hits])
            print(f"Mostrando {len(hits)} de {response['hits']['total']['value']} resultados:")
            print(df.to_string())
        else:
            print("La consulta no devolvió resultados.")
    else:
        print("Respuesta no reconocida o vacía.")
    print("-" * 50)



In [None]:
# Consulta 1: Entidades con POBTOT entre 1M y 3M, ordenadas
index_name = "ine_entidad_2020"
query_body = {
    "query": {
        "range": {
            "POBTOT": {
                "gte": 1000000,
                "lte": 3000000
            }
        }
    },
    "sort": [
        {"POBTOT": {"order": "desc"}}
    ],
    "size": 10
}

print(f"Consulta 1: Entidades con POBTOT entre 1M y 3M en {index_name}...")
response = es.search(index=index_name, body=query_body)
display_results(response)


In [83]:
# Consulta 2: Secciones de la entidad '09' con POBTOT > 5000
index_name = "ine_seccion_2020"
entidad_id_ejemplo = 9

query_body = {
    "query": {
        "bool": {
            "must": [
                {"term": {"ENTIDAD": entidad_id_ejemplo}},
                {"range": {"POBTOT": {"gt": 5000}}}
            ]
        }
    },
    "size": 5
}

print(f"Consulta 2: Secciones de entidad {entidad_id_ejemplo} con POBTOT > 5000 en {index_name}...")
response = es.search(index=index_name, body=query_body)
display_results(response)


Consulta 2: Secciones de entidad 9 con POBTOT > 5000 en ine_seccion_2020...
Mostrando 5 de 29 resultados:
       ID  ENTIDAD  DISTRITO  MUNICIPIO  SECCION  TIPO   POBTOT  POBFEM  POBMAS  P_0A2  P_0A2_F  P_0A2_M  P_0A17  P_3YMAS  P_3YMAS_F  P_3YMAS_M  P_5YMAS  P_5YMAS_F  P_5YMAS_M  P_12YMAS  P_12YMAS_F  P_12YMAS_M  P_15YMAS  P_15YMAS_F  P_15YMAS_M  P_18YMAS  P_18YMAS_F  P_18YMAS_M  P_3A5  P_3A5_F  P_3A5_M  P_6A11  P_6A11_F  P_6A11_M  P_8A14  P_8A14_F  P_8A14_M  P_12A14  P_12A14_F  P_12A14_M  P_15A17  P_15A17_F  P_15A17_M  P_18A24  P_18A24_F  P_18A24_M  P_15A49_F  P_60YMAS  P_60YMAS_F  P_60YMAS_M  REL_H_M  POB0_14  POB15_64  POB65_MAS  POB_EDADNE  PROM_HNV  PNACENT  PNACENT_F  PNACENT_M  PNACOE  PNACOE_F  PNACOE_M  PRES2015  PRES2015_F  PRES2015_M  PRESOE15  PRESOE15_F  PRESOE15_M  P3YM_HLI  P3YM_HLI_F  P3YM_HLI_M  P3HLINHE  P3HLINHE_F  P3HLINHE_M  P3HLI_HE  P3HLI_HE_F  P3HLI_HE_M  P5_HLI  P5_HLI_NHE  P5_HLI_HE  PHOG_IND  POB_AFRO  POB_AFRO_F  POB_AFRO_M  PCON_DISC  PCDISC_MOT  PCDISC_VI

In [None]:
# Consulta 3: Promedio de POBTOT y suma de VIVTOT por TIPO de sección
index_name = "ine_seccion_2020"
query_body = {
    "size": 0,
    "aggs": {
        "stats_por_tipo_seccion": {
            "terms": {"field": "TIPO"},
            "aggs": {
                "prom_poblacion": {
                    "avg": {"field": "POBTOT"}
                },
                "total_viviendas": {
                    "sum": {"field": "VIVTOT"}
                }
            }
        }
    }
}

print(f"Consulta 3: Agregación por TIPO de sección en {index_name}...")
response = es.search(index=index_name, body=query_body)
display_results(response, agg_key="stats_por_tipo_seccion")

if 'aggregations' in response and 'stats_por_tipo_seccion' in response['aggregations']:
    for bucket in response['aggregations']['stats_por_tipo_seccion']['buckets']:
        print(f"Tipo de Sección: {bucket['key']}")
        print(f"  Documentos: {bucket['doc_count']}")
        print(f"  Población Promedio: {bucket['prom_poblacion']['value']:.2f}")
        print(f"  Total Viviendas: {bucket['total_viviendas']['value']}")
        print("-" * 20)


In [None]:
# Consulta 4: Distritos de la entidad '15' donde el campo INDIGENA existe
index_name = "ine_distrito_2020"
entidad_ejemplo = "15"

query_body = {
    "query": {
        "bool": {
            "must": [
                {"term": {"ENTIDAD": entidad_ejemplo}}
            ],
            "filter": [
                {"exists": {"field": "INDIGENA"}}
            ]
        }
    },
    "size": 5
}

print(f"Consulta 4: Distritos de entidad {entidad_ejemplo} con campo INDIGENA en {index_name}...")
response = es.search(index=index_name, body=query_body)
display_results(response)


In [89]:
# Consulta 5: Estadísticas completas para POBTOT en ine_seccion_2020
index_name = "ine_seccion_2020"
query_body = {
    "size": 0,
    "aggs": {
        "stats_poblacion_seccion": {
            "stats": {"field": "POBTOT"}
        }
    }
}

print(f"Consulta 5: Estadísticas de POBTOT en {index_name}...")
response = es.search(index=index_name, body=query_body)
display_results(response, agg_key="stats_poblacion_seccion")


Consulta 5: Estadísticas de POBTOT en ine_seccion_2020...
Resultados de la agregación 'stats_poblacion_seccion':
{'count': 12110, 'min': 0.0, 'max': 39780.0, 'avg': 2080.162758051197, 'sum': 25190771.0}
--------------------------------------------------


In [None]:
# Consulta 6: Conteo de distritos por cuales son los que tienen más hombres que mujeres
#              y cuales son los que tienen una alta escolaridad
index_name = "ine_distrito_2020"
query_body = {
    "size": 0,
    "aggs": {
        "distritos_filtrados": {
            "filters": {
                "filters": {
                    "mas_mujeres_que_hombres": {
                        "script": {
                            "script": "doc['POBFEM'].value > doc['POBMAS'].value"
                        }
                    },
                    "alta_escolaridad": {
                        "range": {
                            "GRAPROES": {"gt": 9.5}
                        }
                    }
                }
            }
        }
    }
}

print(f"Consulta 6: Agregación con múltiples filtros en {index_name}...")
response = es.search(index=index_name, body=query_body)

if 'aggregations' in response and 'distritos_filtrados' in response['aggregations']:
    print("Resultados de la agregación 'distritos_filtrados':")
    buckets = response['aggregations']['distritos_filtrados'].get('buckets')
    if buckets:
        if isinstance(buckets, dict):
            for filter_name, data in buckets.items():
                print(f"  Filtro '{filter_name}': {data['doc_count']} distritos")
        else:
             print(pd.DataFrame(buckets).to_string())
    else:
        print("No se encontraron buckets para la agregación.")
else:
    print("No se encontró la clave de agregación 'distritos_filtrados' o no hay agregaciones.")
print("-" * 50)



In [None]:
# Consulta 7: Entidades cuyo NOM_ENT.keyword comienza con "Aguas"
index_name = "ine_entidad_2020"
query_body = {
    "query": {
        "prefix": {
            "NOM_ENT .keyword": "Aguas" # Es con espacio por un error al ingresarlo al Elastic :v
        }
    },
    "size": 5
}

print(f"Consulta 7: Buscando entidades con prefijo 'Aguas' en {index_name}...")
response = es.search(index=index_name, body=query_body)
display_results(response)


In [None]:
# Consulta 8: Entidades con NOM_ENT que contenga "México"
index_name = "ine_entidad_2020"
query_body = {
    "query": {
        "match_phrase": {
            "NOM_ENT ": "México" # El analizador por defecto (o el spanish_analyzer si NOM_ENT lo usa) se aplicará
        }
    }
}

print(f"Consulta 8: Buscando frase 'Estado de México' en {index_name}...")
response = es.search(index=index_name, body=query_body)
display_results(response)


In [77]:
# Consulta 9: Secciones que cumplen al menos una de varias condiciones
index_name = "ine_seccion_2020"
distrito_ejemplo = 1

query_body = {
    "query": {
        "bool": {
            "should": [
                {"range": {"POBTOT": {"gt": 10000}}},
                {"range": {"VIVTOT": {"gt": 3000}}},
                {"term": {"DISTRITO": distrito_ejemplo}}
            ],
            "minimum_should_match": 1
        }
    },
    "size": 5
}

print(f"Consulta 9: Buscando secciones con POBTOT > 10k O VIVTOT > 3k O DISTRITO = {distrito_ejemplo} en {index_name}...")
response = es.search(index=index_name, body=query_body)
display_results(response)


Consulta 9: Buscando secciones con POBTOT > 10k O VIVTOT > 3k O DISTRITO = 1 en ine_seccion_2020...
Mostrando 5 de 1126 resultados:
        ID  ENTIDAD  DISTRITO  MUNICIPIO  SECCION  TIPO   POBTOT  POBFEM  POBMAS   P_0A2  P_0A2_F  P_0A2_M  P_0A17  P_3YMAS  P_3YMAS_F  P_3YMAS_M  P_5YMAS  P_5YMAS_F  P_5YMAS_M  P_12YMAS  P_12YMAS_F  P_12YMAS_M  P_15YMAS  P_15YMAS_F  P_15YMAS_M  P_18YMAS  P_18YMAS_F  P_18YMAS_M  P_3A5  P_3A5_F  P_3A5_M  P_6A11  P_6A11_F  P_6A11_M  P_8A14  P_8A14_F  P_8A14_M  P_12A14  P_12A14_F  P_12A14_M  P_15A17  P_15A17_F  P_15A17_M  P_18A24  P_18A24_F  P_18A24_M  P_15A49_F  P_60YMAS  P_60YMAS_F  P_60YMAS_M  REL_H_M  POB0_14  POB15_64  POB65_MAS  POB_EDADNE  PROM_HNV  PNACENT  PNACENT_F  PNACENT_M  PNACOE  PNACOE_F  PNACOE_M  PRES2015  PRES2015_F  PRES2015_M  PRESOE15  PRESOE15_F  PRESOE15_M  P3YM_HLI  P3YM_HLI_F  P3YM_HLI_M  P3HLINHE  P3HLINHE_F  P3HLINHE_M  P3HLI_HE  P3HLI_HE_F  P3HLI_HE_M  P5_HLI  P5_HLI_NHE  P5_HLI_HE  PHOG_IND  POB_AFRO  POB_AFRO_F  POB_AFRO_M  PCON

In [None]:
# Consulta 10: Distritos de la entidad '15' en cat_distrito_2020
index_name = "cat_distrito_2020"
cve_entidad_buscada = "15"

query_body = {
    "query": {
        "term": {
            "CVE_ENT": cve_entidad_buscada
        }
    },
    "size": 5
}

print(f"Consulta 10: Buscando distritos de CVE_ENT '{cve_entidad_buscada}' en {index_name}...")
response = es.search(index=index_name, body=query_body)
display_results(response)


In [None]:
# Consulta 11: Entidades con "Baja" en NOM_ENT en ine_entidad_2020
index_name = "ine_entidad_2020"
palabra_buscada = "Baja"

query_body = {
    "query": {
        "match": {
            "NOM_ENT ": palabra_buscada
        }
    },
    "size": 5
}

print(f"Consulta 11: Buscando entidades con '{palabra_buscada}' en su nombre en {index_name}...")
response = es.search(index=index_name, body=query_body)
display_results(response)


In [None]:
# Consulta 12: Entidades con POBTOT > 5,000,000 en ine_distrito_2020 (descendiente)
index_name = "ine_entidad_2020"
poblacion_minima = 5000000

query_body = {
    "query": {
        "range": {
            "POBTOT": {
                "gt": poblacion_minima
            }
        }
    },
    "sort": [{"POBTOT": "desc"}],
    "size": 10
}

print(f"Consulta 12: Buscando distritos con POBTOT > {poblacion_minima} en {index_name}...")
response = es.search(index=index_name, body=query_body)
display_results(response)


In [None]:
# Consulta 13: Obtener 3 documentos de muestra de cat_distrito_2020
index_name = "cat_distrito_2020"

query_body = {
    "query": {
        "match_all": {} # Coincide con todos los documentos
    },
    "size": 3
}

print(f"Consulta 13: Obteniendo 3 documentos de muestra de {index_name}...")
response = es.search(index=index_name, body=query_body)
display_results(response)


In [None]:
# Consulta 14: Entidades con claves '01', '09', '15' en ine_entidad_2020
index_name = "ine_entidad_2020"
claves_entidad_buscadas = ["1", "7", "16"]

query_body = {
    "query": {
        "terms": {
            "ENT": claves_entidad_buscadas
        }
    },
    "size": len(claves_entidad_buscadas)
}

print(f"Consulta 14: Buscando entidades con claves {claves_entidad_buscadas} en {index_name}...")
response = es.search(index=index_name, body=query_body)
display_results(response)


In [None]:
# Consulta 15: Las 5 entidades con mayor POBTOT en ine_entidad_2020
index_name = "ine_entidad_2020"

query_body = {
    "query": {
        "match_all": {}
    },
    "sort": [
        {
            "POBTOT": {
                "order": "desc"
            }
        }
    ],
    "size": 5
}

print(f"Consulta 15: Buscando las 5 entidades con mayor población en {index_name}...")
response = es.search(index=index_name, body=query_body)
display_results(response)
