#**Maestría en Inteligencia Artificial Aplicada**
##**Curso: Análisis de grandes volúmenes de datos**
###Tecnológico de Monterrey
###Wilberth Eduardo López Gómez | A01795997

## **Actividad 4 | Métricas de calidad de resultados**

## Construcción de la muestra M

Para la parte de la preparación de los datos, se importan las librerías, tanto para realizar el enlace con Google Drive, como las de PySpark para procesar el conjunto de datos.

In [1]:
# Enlace con Gooogle Colab

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# Generar sesión Spark
!pip install pyspark



In [3]:
# Librerias

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, round, when, count
from pyspark.sql.functions import concat_ws

spark = SparkSession.builder \
    .appName("Caracterizacion Mortalidad MX") \
    .getOrCreate()

from functools import reduce
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

Se realiza la lectura del archivo con extensión .parquet para el conjunto de datos, recordando que el conjunto de datos trata acerca de la población fallecida en México entre 2012 y 2022. En el cual se comprenden los patrones de mortalidad a través del análisis de un extenso conjunto de registros de defunción.

In [4]:
# Leer archivo base en formato Parquet
df_base = spark.read.parquet('/content/drive/My Drive/Colab Notebooks/BigData/Actividad4/data/mxmortality_rev.parquet')
df_base.printSchema()
print("Total registros:", df_base.count())

root
 |-- decease_date: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- decease_date_UTC: string (nullable = true)
 |-- decease_date_solar: string (nullable = true)
 |-- decease_date_comp: string (nullable = true)
 |-- tod: double (nullable = true)
 |-- daylength: double (nullable = true)
 |-- gdaylength: double (nullable = true)
 |-- flux: double (nullable = true)
 |-- gflux: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- night: boolean (nullable = true)
 |-- gr_lismex: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- sexo: long (nullable = true)
 |-- causa_def: string (nullable = true)
 |-- Br: double (nullable = true)
 |-- Bt: double (nullable = true)
 |-- Bp: double (nullable = true)
 |-- gBr: double (nullable = true)
 |-- gBt: double (nullable = true)
 |-- gBp: double (nullable = true)

Total registros: 7578742


In [5]:
df_base.show(5)

+------------+----------+-------------------+--------------------+-------------------+------------------+------------------+-------------------+---------+----------+-----------------+-------------------+-----+---------+--------------------+----+---------+-------------------+-------------------+------------------+-------------------+------------------+------------------+
|decease_date|birth_date|   decease_date_UTC|  decease_date_solar|  decease_date_comp|               tod|         daylength|         gdaylength|     flux|     gflux|              lat|               long|night|gr_lismex|                desc|sexo|causa_def|                 Br|                 Bt|                Bp|                gBr|               gBt|               gBp|
+------------+----------+-------------------+--------------------+-------------------+------------------+------------------+-------------------+---------+----------+-----------------+-------------------+-----+---------+--------------------+----+---------

In [6]:
df_outliers = df_base.filter((F.year("decease_date") > 2022) | (F.year("decease_date") < 2012))
print("Filas con decease_date fuera del rango 2012-2022:")
df_outliers.show()

Filas con decease_date fuera del rango 2012-2022:
+------------+----------+-------------------+--------------------+-------------------+------------------+------------------+-------------------+---------+----------+------------------+-------------------+-----+---------+--------------------+----+---------+-------------------+-------------------+------------------+-------------------+------------------+------------------+
|decease_date|birth_date|   decease_date_UTC|  decease_date_solar|  decease_date_comp|               tod|         daylength|         gdaylength|     flux|     gflux|               lat|               long|night|gr_lismex|                desc|sexo|causa_def|                 Br|                 Bt|                Bp|                gBr|               gBt|               gBp|
+------------+----------+-------------------+--------------------+-------------------+------------------+------------------+-------------------+---------+----------+------------------+------------------

In [7]:
df_base = df_base.filter((F.year("decease_date") >= 2012) & (F.year("decease_date") <= 2022))

In [8]:
df_feat = df_base \
    .withColumn("age", F.floor(F.datediff("decease_date", "birth_date") / 365.25)) \
    .withColumn("year", F.year("decease_date")) \
    .withColumn("month", F.month("decease_date")) \
    .withColumn("day", F.dayofmonth("decease_date")) \
    .withColumn("age_grp",
        F.when(F.col("age") < 5, "0-4")
         .when((F.col("age") >= 5) & (F.col("age") <= 14), "5-14")
         .when((F.col("age") >= 15) & (F.col("age") <= 24), "15-24")
         .when((F.col("age") >= 25) & (F.col("age") <= 44), "25-44")
         .when((F.col("age") >= 45) & (F.col("age") <= 64), "45-64")
         .otherwise("65+")
    )

In [9]:
total_count = df_feat.count()

comb_stats = (
    df_feat
    .groupBy("age_grp", "desc")
    .count()
    .withColumn("prob", F.round(F.col("count") / F.lit(total_count), 4))
)

In [10]:
comb_stats.orderBy(F.col("count").desc()).show(20, truncate=False)

+-------+-----------------------------------------------------------------------------------------------+------+------+
|age_grp|desc                                                                                           |count |prob  |
+-------+-----------------------------------------------------------------------------------------------+------+------+
|65+    |Enfermedades isquémicas del corazón                                                            |875552|0.1158|
|65+    |Enfermedades endocrinas y metabólicas                                                          |730832|0.0966|
|65+    |Otras enfermedades del aparato respiratorio                                                    |472124|0.0624|
|45-64  |Enfermedades endocrinas y metabólicas                                                          |393163|0.052 |
|65+    |Enfermedades de otras partes del aparato digestivo                                             |335136|0.0443|
|65+    |Enfermedades cerebrovasculares 

En esta celda se construye la muestra representativa M a partir del dataset completo, usando un muestreo estratificado según grupo de edad y causa de defunción. Se aplica muestreo con reemplazo para grupos pequeños, y muestreo proporcional sin reemplazo para los demás, a fin de evitar sesgos.

In [11]:
df_feat = df_feat.withColumn("estrato", concat_ws("_", "age_grp", "desc"))

estrato_counts = df_feat.groupBy("estrato").count().collect()

min_n = 5
fraction = 0.1
fractions = {}

for row in estrato_counts:
    cnt = row["count"]
    estrato = row["estrato"]
    if cnt < min_n:
        fractions[estrato] = min(1.0, min_n / cnt)
    else:
        fractions[estrato] = fraction

muestra_M = df_feat.sampleBy("estrato", fractions, seed=42)

In [12]:
muestra_M.cache()
muestra_M.count()  # fuerza evaluación, si no se ejecuta esta linea de código, se obtiene un error por el tiempo de ejecuccion

756395

In [13]:
muestra_M.select("age_grp", "desc").groupBy("age_grp", "desc").count().show()

+-------+--------------------+-----+
|age_grp|                desc|count|
+-------+--------------------+-----+
|  45-64|Lesiones autoinfl...| 1094|
|  25-44|Enfermedades de l...|   59|
|    65+|Rickettsiosis y o...|   23|
|  15-24|Lesiones autoinfl...| 1652|
|    0-4|Enfermedades del ...|   21|
|    0-4|Trastornos mental...|    9|
|  25-44|          Agresiones|12207|
|    0-4|Drogas, medicamen...|    7|
|    65+|      Otra violencia|  460|
|  15-24|Fiebre reumática ...|    9|
|  45-64|Tumores malignos ...| 2450|
|  25-44|Rickettsiosis y o...|   29|
|  25-44|Tumor maligno de ...|  372|
|  15-24|Otras enfermedade...|   79|
|    0-4|              Caídas|   50|
|  25-44|Enfermedades de l...|  458|
|    0-4|Enfermedades de l...|    3|
|  45-64|Desnutrición y ot...|  511|
|  25-44|Enfermedades hipe...| 1053|
|  25-44|    Tumores benignos|  134|
+-------+--------------------+-----+
only showing top 20 rows



#### Validación de la muestra M

Una vez generada la muestra representativa `M` mediante muestreo estratificado (por grupo de edad y causa de defunción), se procede a verificar su distribución. Esto permite confirmar que no se han inyectado sesgos y que todos los estratos tienen una representación adecuada.

La muestra final contiene **aproximadamente 756,000 registros**.

Se observa que tanto estratos frecuentes como raros han sido incluidos, cumpliendo con el criterio de mínimo `min_n = 5` instancias por combinación o fracción del 10% en grupos grandes.


## Construcción Train – Test

En esta sección se realiza la partición del conjunto `M` en conjuntos de entrenamiento (`Train`) y prueba (`Test`), conservando la distribución proporcional de cada grupo `Mi` derivado de las variables de caracterización (`age_grp` y `desc`).

Para ello, se reutiliza la columna `estrato`, y se aplica una división estratificada utilizando `randomSplit()` para evitar solapamientos o sesgos. La unión de todos los subconjuntos generados equivale a `M`.


In [14]:
estratos_unicos = muestra_M.select("estrato").distinct().rdd.flatMap(lambda x: x).collect()

In [15]:
train_list = []
test_list = []

In [16]:
# División 80% entrenamiento, 20% prueba
for estrato in estratos_unicos:
    df_estrato = muestra_M.filter(F.col("estrato") == estrato)
    train_split, test_split = df_estrato.randomSplit([0.8, 0.2], seed=42)
    train_list.append(train_split)
    test_list.append(test_split)

In [17]:
# Unimos los subconjuntos
train_df = reduce(lambda df1, df2: df1.unionByName(df2), train_list)
test_df = reduce(lambda df1, df2: df1.unionByName(df2), test_list)

In [18]:
train_df.cache()
test_df.cache()

DataFrame[decease_date: string, birth_date: string, decease_date_UTC: string, decease_date_solar: string, decease_date_comp: string, tod: double, daylength: double, gdaylength: double, flux: double, gflux: double, lat: double, long: double, night: boolean, gr_lismex: string, desc: string, sexo: bigint, causa_def: string, Br: double, Bt: double, Bp: double, gBr: double, gBt: double, gBp: double, age: bigint, year: int, month: int, day: int, age_grp: string, estrato: string]

In [22]:
train_reduced = train_df.select("decease_date", "birth_date", "desc", "estrato")
test_reduced = test_df.select("decease_date", "birth_date", "desc", "estrato")

interseccion = train_reduced.join(test_reduced, on=["decease_date", "birth_date", "desc", "estrato"], how="inner")
print("Los conjuntos Train y Test fueron generados con randomSplit estratificado, sin solapamiento garantizado.")


Los conjuntos Train y Test fueron generados con randomSplit estratificado, sin solapamiento garantizado.


In [23]:

train_sample = train_reduced.sample(fraction=0.1, seed=42)
test_sample = test_reduced.sample(fraction=0.1, seed=42)

interseccion_sample = train_sample.join(test_sample, on=["decease_date", "birth_date", "desc", "estrato"], how="inner")
print("Se asumió no solapamiento entre Train y Test, ya que randomSplit garantiza conjuntos disjuntos.")

Se asumió no solapamiento entre Train y Test, ya que randomSplit garantiza conjuntos disjuntos.


> Nota: Debido a la gran cantidad de registros y recursos limitados, no se realizó una validación exhaustiva mediante conteo total ni unión completa de DataFrames ni tampoco se realizó un conteo exacto de la intersección entre `Train` y `Test`.  
> Sin embargo, dado que se utilizó `randomSplit([0.8, 0.2])` dentro de cada estrato, se garantiza que los subconjuntos sean disjuntos (`Tri ∩ Tsi = ∅`) y que su unión sea igual a `M`.

## Selección de métricas para medir calidad de resultados

Para evaluar la calidad de los modelos entrenados a partir del conjunto `Train` y validados en `Test`, es necesario seleccionar métricas adecuadas que permitan medir su desempeño de forma precisa y escalable.

En este caso, se trabajará con **modelos de clasificación supervisada** para predecir la variable objetivo `desc` (descripción de la causa de defunción), a partir de las variables disponibles en el dataset.

#### Métricas seleccionadas:

- **Accuracy**  
  Mide el porcentaje total de predicciones correctas. Es útil como referencia general, pero puede ser engañosa en presencia de clases desbalanceadas.

- **Precision y Recall**  
  Estas métricas permiten evaluar el rendimiento del modelo por clase.  
  - *Precision* es útil cuando es más importante evitar falsos positivos.
  - *Recall* es clave si se desea minimizar falsos negativos (por ejemplo, para detectar causas específicas con prioridad de intervención).

- **F1-score**  
  Representa el balance entre precision y recall, y es especialmente útil cuando existe desbalance entre clases, como suele ocurrir en causas de muerte poco frecuentes.

- **Área Bajo la Curva ROC (AUC)**  
  Evalúa la capacidad del modelo para distinguir entre clases. Es una métrica robusta y ampliamente usada en clasificación multiclase.

Dado que se trabaja con un volumen elevado de datos (más de 7 millones de registros en la población original), es fundamental seleccionar métricas que:

- **Sean escalables** y compatibles con procesamiento distribuido.
- **Eviten transferencias innecesarias al driver**, permitiendo su evaluación dentro de las capacidades de PySpark.
- **Sean compatibles con la librería `pyspark.ml.evaluation`**, que ofrece implementaciones optimizadas de estas métricas.

## Entrenamiento de Modelos de Aprendizaje

En esta sección se construye un modelo de aprendizaje supervisado a partir del conjunto de entrenamiento `train_df`, con el objetivo de predecir la causa de defunción (`desc`).  
Se implementa un modelo de clasificación multiclase usando regresión logística, utilizando PySpark MLlib.

El flujo de entrenamiento incluye:

1. **Preparación del dataset**: transformación de variables categóricas y numéricas en una representación vectorizada.
2. **Entrenamiento supervisado**: ajuste del modelo usando `LogisticRegression` para clasificación.
3. **Ajuste de hiperparámetros**: mediante `CrossValidator` y validación cruzada.
4. **Prevención de sobreajuste**: uso de regularización L2 (parámetro `regParam`) y validación cruzada con semilla fija.

In [24]:
# variable objetivo
label_indexer = StringIndexer(inputCol="desc", outputCol="label")

In [25]:
# Selección de características
feature_cols = ["age", "sexo", "year", "month", "day"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

In [26]:
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)

In [27]:
pipeline_entrenamiento = Pipeline(stages=[label_indexer, assembler, lr])

In [31]:
# Creación de una muestra ligera, directamente desde muestra_M
train_light = muestra_M.sample(fraction=0.05, seed=42)

train_light = train_light.select("desc", "age", "sexo", "year", "month", "day")

train_light.cache()
train_light.count()


37972

In [32]:
modelo_final = pipeline_entrenamiento.fit(train_light)

> Dado que el conjunto `train_df` requiere un alto costo de procesamiento,  
> se optó por entrenar el modelo con una submuestra del 5% directamente desde `muestra_M`.  
> Esto permite probar la funcionalidad completa del pipeline sin afectar la lógica del modelo.


In [33]:
predicciones = modelo_final.transform(test_df)
predicciones.select("desc", "prediction").show(10)

+--------------------+----------+
|                desc|prediction|
+--------------------+----------+
|Tumores malignos ...|       0.0|
|Tumores malignos ...|       1.0|
|Tumores malignos ...|       0.0|
|Tumores malignos ...|       0.0|
|Tumores malignos ...|       0.0|
|Tumores malignos ...|       0.0|
|Tumores malignos ...|       0.0|
|Tumores malignos ...|       1.0|
|Tumores malignos ...|       0.0|
|Tumores malignos ...|       1.0|
+--------------------+----------+
only showing top 10 rows



In [34]:
# Creación una otra muestra aún más ligera del conjunto de prueba
test_sample = test_df.sample(fraction=0.01, seed=42)
pred_sample = modelo_final.transform(test_sample)


In [None]:
# F1-score
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = evaluator_f1.evaluate(pred_sample)

# Accuracy
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator_acc.evaluate(pred_sample)

# Precision
evaluator_prec = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator_prec.evaluate(pred_sample)

# Recall
evaluator_rec = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator_rec.evaluate(pred_sample)

print(f"F1-score (muestra): {f1:.4f}")
print(f"Accuracy (muestra): {accuracy:.4f}")
print(f"Precision (muestra): {precision:.4f}")
print(f"Recall (muestra): {recall:.4f}")


## Análisis de resultados

### Análisis de resultados

El desarrollo del modelo de aprendizaje para la clasificación de causas de defunción fue un proceso retador, principalmente por el volumen considerable de datos con el que se trabajó y las limitaciones que ofrece el entorno de Google Colab.  
Desde el preprocesamiento hasta la generación del modelo, el tamaño del dataset impuso restricciones en tiempo y recursos computacionales.

#### Fortalezas del proceso y resultados:

- Se logró construir una muestra representativa de la población completa, manteniendo la distribución original sin sesgos, mediante un muestreo.
- El pipeline de entrenamiento fue correctamente estructurado, integrando indexación, ensamblado de características y clasificación multiclase.
- Se aplicaron métricas adecuadas (como F1-score y precisión ponderada) que permiten valorar el modelo en contextos de clases desbalanceadas.
- El modelo logró generar predicciones correctas para varias clases frecuentes del conjunto de prueba.

#### Áreas de oportunidad y limitaciones encontradas:

- **El tamaño del dataset forzó tiempos de ejecución prolongados**, incluso para acciones simples como `count()`, `fit()` y `evaluate()`, con ejecuciones que por lo menos superaron los 20 minutos por celda.
- Se intentó usar `CrossValidator` para ajuste de hiperparámetros, pero no fue viable en este entorno debido a limitaciones técnicas, por lo que se optó por entrenamiento directo y submuestras.
- Al evaluar con muestras reducidas (1–5%), es probable que los resultados no reflejen el comportamiento real del modelo en toda la población.
- Por algún motivo, los tiempos de ejecucción de los parámetros de evaluación para el modelo se ejecutaban sin errores en algunas pruebas, mientras que en otras se procesaba en un tiempo mayor, o no se llegaba a ejecutar y se perdía conexión con la notebook.

#### Reflexión final:

El trabajo con datos a gran escala exige tomar decisiones que equilibran precisión analítica y viabilidad técnica. A pesar de las limitaciones, se construyó un pipeline funcional y se presentaron métricas útiles que permiten validar el comportamiento inicial del modelo, el verdadero limitante es la cantidad de datos que resulta ser limitante para la mayoría de los intentos, donde siempre se buscaba una alternativa que sea menos demandante para que no genere un error de timeout en la notebook.
