# Big Data Notebook

## Covid-19

Una empresa farmacéutica internacional está considerando realizar pruebas clínicas de una nueva vacuna/tratamiento para COVID-19 en México. Para tomar decisiones informadas sobre la ubicación y el alcance de las pruebas, la empresa necesita analizar datos epidemiológicos y demográficos relevantes. Con no menos de 1,000,000 registros.

### Adquisición y Limpieza de Datos:

   - Identificar y evaluar fuentes de datos confiables sobre COVID-19 en México.
   - Descargar y transformar datos en formatos adecuados (CSV, JSON).
   - Limpiar y preparar los datos para el análisis. Por ejemplo: fecha (mm-dd-yyyy), no ids, si no nombres.

### Ingesta de Datos en Elasticsearch:

   - Crear un índice en Elasticsearch para almacenar los datos de COVID-19.
   - Definir mappings para optimizar las búsquedas y el análisis.
   - Cargar los datos en Elasticsearch utilizando herramientas como Python, Pandas y la librería elasticsearch.
   - Definir queries básicas

## Solución

Para la solución de esta problemática encontramos un dataset de datos abiertos de la dirección de epidemiología de Mexico ([Datos](https://www.gob.mx/salud/documentos/datos-abiertos-152127)) en este dataset vamos a guardar en un df los datos cargados desde el csv y después vamos a modificar los datos para ponerlos en el formato solicitado para después guardarlo todo en un json para que después lo podamos subir a elastic search


### Importar librerias

In [1]:
from ElasticSearchProvider import ElasticSearchProvider
import json
import time
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import os

## Modificar datos

### Rutas de archivos

Primeramente vamos a definir variables que se repiten mucho que nos van a ayudar con la dirección de los archivos y en las respuestas

In [1]:
json_file_path = "Datos.json"
csv_file_path = "COVID19MEXICO2020.csv"
excel_catalog_path = "Catálogos.xlsx"
RESPONSE_LITERAL = "response: "

>Descripción: Debido a que las rutas de los archivos que vamos a utilizar se repiten muchas veces vamos a declarar variables que contengan los nombres de los archivos

### Entidades y municipios

In [None]:
# Creamos un pequeño diccionario que nos va a ayudar a obtener el nombre de la entidad dependiendo de su id
ENTIDADES_DICT = {
    1: "AGUASCALIENTES",
    2: "BAJA CALIFORNIA",
    3: "BAJA CALIFORNIA SUR",
    4: "CAMPECHE",
    5: "COAHUILA DE ZARAGOZA",
    6: "COLIMA",
    7: "CHIAPAS",
    8: "CHIHUAHUA",
    9: "CIUDAD DE MÉXICO",
    10: "DURANGO",
    11: "GUANAJUATO",
    12: "GUERRERO",
    13: "HIDALGO",
    14: "JALISCO",
    15: "MÉXICO",
    16: "MICHOACÁN DE OCAMPO",
    17: "MORELOS",
    18: "NAYARIT",
    19: "NUEVO LEÓN",
    20: "OAXACA",
    21: "PUEBLA",
    22: "QUERÉTARO",
    23: "QUINTANA ROO",
    24: "SAN LUIS POTOSÍ",
    25: "SINALOA",
    26: "SONORA",
    27: "TABASCO",
    28: "TAMAULIPAS",
    29: "TLAXCALA",
    30: "VERACRUZ DE IGNACIO DE LA LLAVE",
    31: "YUCATÁN",
    32: "ZACATECAS",
    36: "ESTADOS UNIDOS MEXICANOS",
    97: "NO APLICA",
    98: "SE IGNORA",
    99: "NO ESPECIFICADO"
}

# Creamos una función que utilizando le entidad_id nos regrese el nombre de la entidad en base al diccionario de arriba
def obtener_nombre_entidad(entidad_id) -> str:
    """
    Devuelve el nombre de la entidad correspondiente al ID proporcionado.
    Si el ID no está en el diccionario, devuelve el ID original.
    """
    try:
        return ENTIDADES_DICT.get(entidad_id, entidad_id)
    except Exception as e:
        print(f"Error al obtener el nombre de la entidad para ID {entidad_id}: {e}")
        return entidad_id

# Cargar los catálogos de entidades y municipios desde el archivo Excel
entidades_df = pd.read_excel(excel_catalog_path, sheet_name="Catálogo de ENTIDADES")
municipios_df = pd.read_excel(excel_catalog_path, sheet_name="Catálogo MUNICIPIOS")

# Renombrar columnas para facilitar el merge
entidades_df.rename(columns={"CLAVE_ENTIDAD": "ENTIDAD_RES", "ENTIDAD_FEDERATIVA": "NOMBRE_ENTIDAD"}, inplace=True)
municipios_df.rename(columns={"CLAVE_ENTIDAD": "ENTIDAD_RES", "CLAVE_MUNICIPIO": "MUNICIPIO_RES", "MUNICIPIO": "NOMBRE_MUNICIPIO"}, inplace=True)

# delete ABREVIATURA column from entidades_df debido a que no lo vamos a utilizar
entidades_df.drop(columns=["ABREVIATURA"], inplace=True)

# Definir el mapeo de tipos de datos para las columnas
dtype_mapping = {
    "PAIS_NACIONALIDAD": "str",  # Columna 38
    "PAIS_ORIGEN": "str",       # Columna 39
}

# cargar el df desde el csv
df = pd.read_csv(csv_file_path, dtype=dtype_mapping)

# Hacer merge para agregar los nombres de entidad y municipio
df = df.merge(entidades_df, on="ENTIDAD_RES", how="left")
df = df.merge(municipios_df, on=["ENTIDAD_RES", "MUNICIPIO_RES"], how="left")

# Crear el campo RES como un diccionario
df["RES"] = df.apply(
    lambda row: {
        "IS_ENTIDAD": row["ENTIDAD_RES"],
        "ENTIDAD": row["NOMBRE_ENTIDAD"],
        "ID_MUNICIPIO": row["MUNICIPIO_RES"],
        "MUNICIPIO": row["NOMBRE_MUNICIPIO"]
    },
    axis=1
)

# delete the columns "ENTIDAD_RES", "MUNICIPIO_RES", "NOMBRE_ENTIDAD", "NOMBRE_MUNICIPIO"
df.drop(columns=["ENTIDAD_RES", "MUNICIPIO_RES", "NOMBRE_ENTIDAD", "NOMBRE_MUNICIPIO"], inplace=True)

# Leave only the first 1000000 rows
df = df.head(1000000)

# Save the DataFrame to a JSON file
df.to_json(json_file_path, orient="records", lines=False, indent=4)
print("Archivo JSON creado")

# Load the JSON file into a DataFrame
df = pd.read_json(json_file_path, orient="records", lines=False)

# Change "ENTIDAD_UM", "ENTIDAD_NAC" to a dictionary of the id and the name of the entity
if "ENTIDAD_UM" in df.columns:
    df["ENTIDAD_UM"] = df["ENTIDAD_UM"].apply(
        lambda entidad_id: {
            "ID": entidad_id,
            "NOMBRE": obtener_nombre_entidad(int(entidad_id)) if pd.notnull(entidad_id) else None
        }
    )

if "ENTIDAD_NAC" in df.columns:
    df["ENTIDAD_NAC"] = df["ENTIDAD_NAC"].apply(
        lambda entidad_id: {
            "ID": entidad_id,
            "NOMBRE": obtener_nombre_entidad(int(entidad_id)) if pd.notnull(entidad_id) else None
        }
    )

# Guardar el DataFrame modificado en el archivo JSON
df.to_json(json_file_path, orient="records", lines=False, indent=4)
print("Archivo JSON actualizado con los cambios en 'ENTIDAD_UM' y 'ENTIDAD_NAC'")

>Descripción: Con esto cargamos los datos del csv y le hacemos algunas modificaciones básicas donde guardamos unicamente lso primeros 1´000,000 registros y también reorganizamos las entidades para que cuando se guarden en el json se guarde el id y el nombre de la entidad y para la residencia se guarda tanto la entidad como el municipio con el sus id´s, el mapping para estas modificaciones seria algo similar a esto:
```json
    "ENTIDAD_UM": {
        "properties": {
          "ID": { "type": "integer" },
          "NOMBRE": { "type": "keyword" }
        }
      },
      "ENTIDAD_NAC": {
        "properties": {
          "ID": { "type": "integer" },
          "NOMBRE": { "type": "keyword" }
        }
      },
      "RES": {
        "properties": {
          "id_entidad": { "type": "integer" },
          "entidad": { "type": "keyword" },
          "id_municipio": { "type": "integer" },
          "municipio": { "type": "keyword" }
        }
      }
```

### Modificar país origen

In [None]:
# Load the JSON file into a DataFrame
df = pd.read_json(json_file_path, orient="records", lines=False)

# Check and convert 'PAIS_ORIGEN' column values
if "97" in df["PAIS_ORIGEN"].values:
    df["PAIS_ORIGEN"] = 'No aplica'
    df.to_json(json_file_path, orient="records", lines=False, indent=4)
    print("Valores de 'PAIS_ORIGEN' actualizados")

>Descripción: debido a que el df que utilizamos en el campo "PAIS_ORIGEN" solo cuenta con el id 97 que significa que no aplica vamos a sustituir el id con su valor en el mismo campo

### Modificar fechas

In [None]:
# Load the JSON file into a DataFrame
df = pd.read_json(json_file_path, orient="records", lines=False)

# List of date columns
date_columns = [
    'FECHA_ACTUALIZACION',
    'FECHA_INGRESO',
    'FECHA_SINTOMAS',
    'FECHA_DEF'
]

# Check and convert date format for each date column
for date_column in date_columns:
    if date_column in df.columns:
        # Replace invalid dates with empty strings
        if '9999-99-99' in df[date_column].values:
            df[date_column] = df[date_column].replace('9999-99-99', '')

        if not df[date_column].str.match(r'\d{2}-\d{2}-\d{4}').all():
            # Convert the date format
            df[date_column] = pd.to_datetime(df[date_column], errors='coerce').dt.strftime('%m-%d-%Y')

            # Save the modified DataFrame back to JSON
            df.to_json(json_file_path, orient="records", lines=False, indent=4)
            print(f"Formato de fecha actualizado para la columna {date_column}")

>Descripción: como se solicita tenemos que cambiar el formato de las fechas de df entonces primeramente identificamos que hay 4 campos que son de tipo fecha entonces vamos a crear un arreglo para iterar y modificar estos campos 1 por 1 y se hizo una modificación que debido a que elasticSearch no acepta fechas en formato de 99-99-9999 que indican que el paciente no tiene esta fecha pues vamos a cambiarlo por un campo vació

### Modificar resultado de laboratorio

In [None]:
# Load the JSON file into a DataFrame
df = pd.read_json(json_file_path, orient="records", lines=False)

# Check and convert 'RESULTADO_LAB' column values
if df["RESULTADO_LAB"].isin([1, 2, 3, 4, 97]).any():
    df["RESULTADO_LAB"] = df["RESULTADO_LAB"].replace({
        1: 'Positivo', 
        2: 'Negativo',
        3: 'Pendiente',
        4: 'Resultado no aplicable',
        97: 'No se realizó la prueba'
        })

    # Save the modified DataFrame back to JSON
    df.to_json(json_file_path, orient="records", lines=False, indent=4)
    print("Valores de 'RESULTADO_LAB' actualizados")

>Descripción: simplemente remplazamos los id´s por su valores correspondientes

### Modificar clasificación final

In [None]:
# Load the JSON file into a DataFrame
df = pd.read_json(json_file_path, orient="records", lines=False)

# Check and convert 'CLASIFICACION_FINAL' column values
if df["CLASIFICACION_FINAL"].isin([1, 2, 3, 4, 5, 6, 7]).any():
    df["CLASIFICACION_FINAL"] = df["CLASIFICACION_FINAL"].replace({
        1: 'CASO DE COVID-19 CONFIRMADO POR ASOCIACIÓN CLÍNICA EPIDEMIOLÓGICA',
        2: 'CASO DE COVID-19 CONFIRMADO POR COMITÉ DE DICTAMINACIÓN',
        3: 'CASO DE SARS-COV-2 CONFIRMADO',
        4: 'INVÁLIDO POR LABORATORIO',
        5: 'NO REALIZADO POR LABORATORIO',
        6: 'CASO SOSPECHOSO',
        7: 'NEGATIVO A SARS-COV-2',
        })

    # Save the modified DataFrame back to JSON
    df.to_json(json_file_path, orient="records", lines=False, indent=4)
    print("Valores de 'CLASIFICACION_FINAL' actualizados")

>Descripción: simplemente remplazamos los id´s por su valores correspondientes

### Modificar sexo

In [None]:
# Load the JSON file into a DataFrame
df = pd.read_json(json_file_path, orient="records", lines=False)

# Check and convert 'SEXO' column values
if df["SEXO"].isin([1, 2, 99]).any():
    df["SEXO"] = df["SEXO"].replace({2: 'Hombre', 1: 'Mujer', 99: 'No Especificado'})

    # Save the modified DataFrame back to JSON
    df.to_json(json_file_path, orient="records", lines=False, indent=4)
    print("Valores de 'SEXO' actualizados")

>Descripción: simplemente remplazamos los id´s por su valores correspondientes

### Modificar le tipo de paciente

In [None]:
# Load the JSON file into a DataFrame
df = pd.read_json(json_file_path, orient="records", lines=False)

# Check and convert 'TIPO_PACIENTE' column values
if df["TIPO_PACIENTE"].isin([1, 2, 99]).any():
    df["TIPO_PACIENTE"] = df["TIPO_PACIENTE"].replace({1: 'Ambulatorio', 2: 'Hospitalizado', 99: 'No Especificado'})

    # Save the modified DataFrame back to JSON
    df.to_json(json_file_path, orient="records", lines=False, indent=4)
    print("Valores de 'TIPO_PACIENTE' actualizados")

>Descripción: simplemente remplazamos los id´s por su valores correspondientes

### Modificaciones de origen

In [None]:
# Load the JSON file into a DataFrame
df = pd.read_json(json_file_path, orient="records", lines=False)

# Check and convert 'ORIGEN' column values
if df["ORIGEN"].isin([1, 2, 99]).any():
    df["ORIGEN"] = df["ORIGEN"].replace({1: 'USMER', 2: 'Fuera de USMER', 99: 'No Especificado'})

    # Save the modified DataFrame back to JSON
    df.to_json(json_file_path, orient="records", lines=False, indent=4)
    print("Valores de 'ORIGEN' actualizados")

>Descripción: simplemente remplazamos los id´s por su valores correspondientes

### Modificaciones de sector

In [None]:
# Load the JSON file into a DataFrame
df = pd.read_json(json_file_path, orient="records", lines=False)

# Check and convert 'SECTOR' column values
if df["SECTOR"].isin([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 13, 99]).any():
    df["SECTOR"] = df["SECTOR"].replace({
        1: 'CRUZ ROJA',
        2: 'DIF',
        3: 'ESTATAL',
        4: 'IMSS',
        5: 'IMSS-BIENESTAR',
        6: 'ISSSTE',
        7: 'MUNICIPAL',
        8: 'PEMEX',
        9: 'PRIVADA',
        10: 'SEDENA',
        11: 'SEMAR',
        12: 'SSA',
        13: 'UNIVERSITARIO',
        14: 'CIJ',
        15: 'IMSS Bienestar OPD',
        99: 'No Especificado'
    })

    # Save the modified DataFrame back to JSON
    df.to_json(json_file_path, orient="records", lines=False, indent=4)
    print("Valores de 'SECTOR' actualizados")

>Descripción: simplemente remplazamos los id´s por su valores correspondientes

### Modificaciones de nacionalidad

In [None]:
# Load the JSON file into a DataFrame
df = pd.read_json(json_file_path, orient="records", lines=False)

# Check and convert 'Nacionalidad' column values
if df["NACIONALIDAD"].isin([1, 2, 99]).any():
    df["NACIONALIDAD"] = df["NACIONALIDAD"].replace({1: 'Mexicana', 2: 'Extranjera', 99: 'No Especificado'})

    # Save the modified DataFrame back to JSON
    df.to_json(json_file_path, orient="records", lines=False, indent=4)
    print("Valores de 'NACIONALIDAD' actualizados")

>Descripción: simplemente remplazamos los id´s por su valores correspondientes

### Modificaciones de columnas estándar

In [None]:
# Load the JSON file into a DataFrame
df = pd.read_json(json_file_path, orient="records", lines=False)

Columnas_estándar = [
    "INTUBADO",
    "NEUMONIA",
    "EMBARAZO",
    "HABLA_LENGUA_INDIG",
    "INDIGENA",
    "DIABETES",
    "EPOC",
    "ASMA",
    "INMUSUPR",
    "HIPERTENSION",
    "OTRA_COM",
    "CARDIOVASCULAR",
    "OBESIDAD",
    "RENAL_CRONICA",
    "TABAQUISMO",
    "OTRO_CASO",
    "TOMA_MUESTRA_LAB",
    "TOMA_MUESTRA_ANTIGENO",
    "MIGRANTE",
    "UCI"
]

for columna in Columnas_estándar:
    if df[columna].isin([1, 2, 97, 98, 99]).any():
        df[columna] = df[columna].replace({1: 'Sí', 2: 'No', 97: 'No aplica', 98: 'Se ignora', 99: 'No especificado'})

        # Save the modified DataFrame back to JSON
        df.to_json(json_file_path, orient="records", lines=False, indent=4)
        print(f"Valores de '{columna}' actualizados")

>Descripción: simplemente remplazamos los id´s por su valores correspondientes

### Resultado

Como resultado vamos a obtener un archivo json con un formato como el del siguiente ejemplo:

```Json
[
    {
        "FECHA_ACTUALIZACION":"10-31-2021",
        "ID_REGISTRO":"z4d6fe",
        "ORIGEN":"USMER",
        "SECTOR":"ISSSTE",
        "ENTIDAD_UM":{
            "ID":24,
            "NOMBRE":"SAN LUIS POTOS\u00cd"
        },
        "SEXO":"Mujer",
        "ENTIDAD_NAC":{
            "ID":24,
            "NOMBRE":"SAN LUIS POTOS\u00cd"
        },
        "TIPO_PACIENTE":"Ambulatorio",
        "FECHA_INGRESO":"12-06-2020",
        "FECHA_SINTOMAS":"11-30-2020",
        "FECHA_DEF":null,
        "INTUBADO":"No aplica",
        "NEUMONIA":"No",
        "EDAD":55,
        "NACIONALIDAD":"Mexicana",
        "EMBARAZO":"No",
        "HABLA_LENGUA_INDIG":"No",
        "INDIGENA":"No",
        "DIABETES":"S\u00ed",
        "EPOC":"No",
        "ASMA":"No",
        "INMUSUPR":"No",
        "HIPERTENSION":"No",
        "OTRA_COM":"No",
        "CARDIOVASCULAR":"No",
        "OBESIDAD":"No",
        "RENAL_CRONICA":"No",
        "TABAQUISMO":"No",
        "OTRO_CASO":"No",
        "TOMA_MUESTRA_LAB":"S\u00ed",
        "RESULTADO_LAB":"Positivo",
        "TOMA_MUESTRA_ANTIGENO":"No",
        "RESULTADO_ANTIGENO":"No se realiz\u00f3 la prueba",
        "CLASIFICACION_FINAL":"CASO DE SARS-COV-2 CONFIRMADO",
        "MIGRANTE":"No especificado",
        "PAIS_NACIONALIDAD":"M\u00e9xico",
        "PAIS_ORIGEN":"No aplica",
        "UCI":"No aplica",
        "RES":{
            "id_entidad":24,
            "entidad":"SAN LUIS POTOS\u00cd",
            "id_municipio":28,
            "municipio":"SAN LUIS POTOS\u00cd"
        }
    }
]
```

## Cargar los datos a Elastic Search

Después de ya tener nuestro json en el formato deseado vamos a cargar estos datos a Elastic Search

### Verificar conexión

In [None]:
try:
        # Create an instance of the ElasticSearchProvider class
        # and establish a connection with the ElasticSearch server
        es_handler = ElasticSearchProvider(index="covid-19")
        print("es_handler: ", es_handler, "\n")
        
except Exception as e:
        print(f"An error occurred: {e}")

>Descripción: Solo comprobamos la conexión con el constructor de la clase de ElasticSearchProvider() y le pasamos el nombre de nuestro indice

Constructor de ElasticSearchProvider:

```Python
def __init__(self, index="person"):
    self.host = "http://localhost:9200"
    #   self.user = str(user)
    #   self.password = str(password)
    self.index = index
    self.index_type = "_doc"
    # Configurar timeout, re-intentos y retry_on_timeout
    self.connection = Elasticsearch(
        self.host,
        timeout=30,  # Tiempo de espera en segundos
        max_retries=3,  # Número máximo de re-intentos
        retry_on_timeout=True  # Re-intentar si hay un timeout
    )
```


### Cargar el mapping de nuestro indice

Covid-19_mapping.json:
```Json
{
  "mappings": {
    "properties": {
      "FECHA_ACTUALIZACION": { "type": "date", "format": "MM-dd-yyyy" },
      "ID_REGISTRO": { "type": "keyword" },
      "ORIGEN": { "type": "keyword" },
      "SECTOR": { "type": "keyword" },
      "ENTIDAD_UM": {
        "properties": {
          "ID": { "type": "integer" },
          "NOMBRE": { "type": "keyword" }
        }
      },
      "SEXO": { "type": "keyword" },
      "ENTIDAD_NAC": {
        "properties": {
          "ID": { "type": "integer" },
          "NOMBRE": { "type": "keyword" }
        }
      },
      "TIPO_PACIENTE": { "type": "keyword" },
      "FECHA_INGRESO": { "type": "date", "format": "MM-dd-yyyy" },
      "FECHA_SINTOMAS": { "type": "date", "format": "MM-dd-yyyy" },
      "FECHA_DEF": {
        "type": "date",
        "format": "MM-dd-yyyy",
        "null_value": null
      },
      "INTUBADO": { "type": "keyword" },
      "NEUMONIA": { "type": "keyword" },
      "EDAD": { "type": "integer" },
      "NACIONALIDAD": { "type": "keyword" },
      "EMBARAZO": { "type": "keyword" },
      "HABLA_LENGUA_INDIG": { "type": "keyword" },
      "INDIGENA": { "type": "keyword" },
      "DIABETES": { "type": "keyword" },
      "EPOC": { "type": "keyword" },
      "ASMA": { "type": "keyword" },
      "INMUSUPR": { "type": "keyword" },
      "HIPERTENSION": { "type": "keyword" },
      "OTRA_COM": { "type": "keyword" },
      "CARDIOVASCULAR": { "type": "keyword" },
      "OBESIDAD": { "type": "keyword" },
      "RENAL_CRONICA": { "type": "keyword" },
      "TABAQUISMO": { "type": "keyword" },
      "OTRO_CASO": { "type": "keyword" },
      "TOMA_MUESTRA_LAB": { "type": "keyword" },
      "RESULTADO_LAB": { "type": "keyword" },
      "TOMA_MUESTRA_ANTIGENO": { "type": "keyword" },
      "RESULTADO_ANTIGENO": { "type": "keyword" },
      "CLASIFICACION_FINAL": { "type": "keyword" },
      "MIGRANTE": { "type": "keyword" },
      "PAIS_NACIONALIDAD": { "type": "keyword" },
      "PAIS_ORIGEN": { "type": "keyword" },
      "UCI": { "type": "keyword" },
      "RES": {
        "properties": {
          "id_entidad": { "type": "integer" },
          "entidad": { "type": "keyword" },
          "id_municipio": { "type": "integer" },
          "municipio": { "type": "keyword" }
        }
      }
    }
  }
}
```

In [None]:
try:
    with ElasticSearchProvider(index="covid-19") as es:
        
        # Load the json of the mapping of the index into a dictionary
        mapping_data = pd.read_json("./Proyecto Unidad I/Covid-19_mapping.json")
        mapping = mapping_data.to_dict()

        response = es.create_index(mapping=mapping)
        print(f"{RESPONSE_LITERAL} {json.dumps(response.body if hasattr(response, 'body') else response, indent=4)}\n")

        response = es.get_mapping()
        print(f"{RESPONSE_LITERAL} {json.dumps(response.body if hasattr(response, 'body') else response, indent=4)}\n")
        
except Exception as e:
    print(f"an error occurred: {e}")

>Descripción: con esto cargamos el mapping a nuestro indice de Elastic Search

Función create_index(mapping) y get_mapping() de la clase ElasticSearchProvider:

```Python
def create_index(self, mapping):
    try:
        if not self.connection.indices.exists(index=self.index):
            response = self.connection.indices.create(index=self.index, body=mapping)
        else:
            response = {
                "StatusCode": 400,
                "body": json.dumps({
                    "message": f"Index {self.index} already exists"
                })
            }
        return response
    except Exception as e:
        return {
            "StatusCode": 500,
            "body": json.dumps({
                "message": str(e)
                })
        }

def get_mapping(self):
    try:
        response = self.connection.indices.get_mapping(index=self.index)
        return response
    except Exception as e:
        return {
            "StatusCode": 500,
            "body": json.dumps({
                "message": str(e)
                })
        }
```

### Cargar los datos a nuestro indice

In [None]:
try:
    with ElasticSearchProvider(index="covid-19") as es:
        
        response = es.load_json_file(json_file_path, batch_size=10000)
        print(f"{RESPONSE_LITERAL} {json.dumps(response.body if hasattr(response, 'body') else response, indent=4)}\n")
        
except Exception as e:
    print(f"an error occurred: {e}")

>Descripción: Con esta función cargamos en bloques de 10,000 los datos a nuestro servidor de Elastic Search

Función load_json_file(json_file_path, batch_size) de ElasticSearchProvider:
```Python
def load_json_file(self, file_path, batch_size=1000):
    try:
        # Leer el archivo JSON usando pandas
        df = pd.read_json(file_path, orient="records", lines=False)

        # Convertir el DataFrame a una lista de diccionarios
        documents = df.to_dict(orient="records")
        
        # Dividir los documentos en lotes
        for i in range(0, len(documents), batch_size):
            batch = documents[i:i + batch_size]
            
            # Preparar los datos para la operación bulk
            bulk_data = [
                {"_index": self.index, "_source": doc}
                for doc in batch
            ]
            
            # Insertar los documentos en Elasticsearch
            helpers.bulk(self.connection, bulk_data)
        
        return f"{len(documents)} documents inserted in {self.index}"
    
    except ValueError as e:
        return {
            "StatusCode": 400,
            "body": json.dumps({
                "message": f"Error reading JSON with pandas: {str(e)}"
            })
        }
    except FileNotFoundError as e:
        return {
            "StatusCode": 404,
            "body": json.dumps({
                "message": f"File not found: {str(e)}"
            })
        }
    except ConnectionError as e:
        return {
            "StatusCode": 500,
            "body": json.dumps({
                "message": f"Connection error: {str(e)}"
            })
        }
    except Exception as e:
        return {
            "StatusCode": 500,
            "body": json.dumps({
                "message": f"An error occurred: {str(e)}"
            })
        }
```

### Comprobar que se subió exitosamente y una consulta

In [None]:
try:
    with ElasticSearchProvider(index="covid-19") as es:
        
        # Search for documents in the index
        print("Search Covid-19 Documents Response:")
        response = es.get_all_documents()
        print(f"{RESPONSE_LITERAL} {json.dumps(response.body if hasattr(response, 'body') else response, indent=4)}\n")
        
except Exception as e:
    print(f"an error occurred: {e}")

>Descripción: con esta función obtenemos de nuestro indice todos los registros o al menos los primeros 10,000 debido a que Elastic Search no te muestra mas de 10,000 registros a la vez 

Función get_all_documents() de ElasticSearchProvider:

```Python
def get_all_documents(self):
    try:
        response = self.connection.search(index=self.index, body={"query": {"match_all": {}}})
        return response
    except Exception as e:
        return {
            "StatusCode": 500,
            "body": json.dumps({
                "message": str(e)
                })
        }
```

In [None]:
try:
    with ElasticSearchProvider(index="covid-19") as es:
        
        size = 32
        response = es.search_document(query={
            "size": 0,
            "aggs": {
                "entidades": {
                    "terms": {
                        "field": "ENTIDAD_UM.NOMBRE",
                        "size": size
                    },
                    "aggs": {
                        "nombre_entidad": {
                            "top_hits": {
                                "size": 1,
                                "_source": {
                                    "includes": ["ENTIDAD_UM.ID"]
                                }
                            }
                        }
                    }
                }
            }
        })

        # Process the response to extract data for the DataFrame
        if hasattr(response, 'body'):
            response_data = response.body
        else:
            response_data = response
        
        # Extract buckets from the aggregation
        buckets = response_data.get("aggregations", {}).get("entidades", {}).get("buckets", [])
        data = []
        for bucket in buckets:
            entidad_nombre = bucket.get("key")
            entidad_id = bucket.get("nombre_entidad", {}).get("hits", {}).get("hits", [{}])[0].get("_source", {}).get("ENTIDAD_UM.ID")
            data.append({"Entidad Nombre": entidad_nombre, "Entidad ID": entidad_id})

        # Create a pandas DataFrame
        df = pd.DataFrame(data)
        print("DataFrame created from search_document response:")
        print(df)

        print(f"{RESPONSE_LITERAL} {json.dumps(response_data, indent=4)}\n")
        
except Exception as e:
    print(f"an error occurred: {e}")

>Descripción: Con esto obtenemos la cantidad de registros que tiene cada entidad junto con el id de la entidad

Explicación del query:

   1. Tamaño del resultado (size: 0): Indica que no se quieren recuperar documentos como tal, sino que se busca obtener únicamente los resultados de las agregaciones.

   2. Agregaciones (aggs): Se usan para obtener información agrupada de los datos.

```Json
    "size": 0,
    "aggs": {
        "entidades": {
            "terms": {
                "field": "ENTIDAD_UM.NOMBRE",
                "size": size
            },
            "aggs": {
                "nombre_entidad": {
                    "top_hits": {
                        "size": 1,
                        "_source": {
                            "includes": ["ENTIDAD_UM.ID"]
                        }
                    }
                }
            }
        }
    }
```

   1. Agrupación por ENTIDAD_UM.NOMBRE
      - Se usa la agregación terms sobre el campo "ENTIDAD_UM.NOMBRE", lo que significa que se agruparán los documentos según el nombre de la entidad de unidad médica (ENTIDAD_UM).

      - El parámetro "size": size define el número máximo de grupos a devolver. Este valor debe estar definido en la variable size.
   2. Subagregación top_hits
      - Una vez que se han agrupado los documentos por nombre de entidad, se agrega una subagregación top_hits, que selecciona hasta un documento por grupo ("size": 1).

      - Solo se extrae el campo "ENTIDAD_UM.ID", es decir, el identificador de la entidad de unidad médica correspondiente al nombre.