# **Actividad 3 | Aprendizaje supervisado y no supervisado**
### **Aprendizaje supervisado**
El aprendizaje supervisado requiere datos de entrada y salida etiquetados durante la fase de entrenamiento del ciclo de vida del machine learning. Estos datos a menudo son etiquetados por un científico de datos en la fase de preparación - una de las tareas que hace un data scientist - antes de usarse para entrenar y probar el modelo.

Se llama aprendizaje supervisado porque al menos parte de este modelo requiere supervisión humana. La gran mayoría de los datos disponibles son datos brutos sin etiquetar. Por lo general, se requiere la interacción humana para etiquetar con precisión los datos. Naturalmente, puede ser un proceso intensivo en recursos, ya que se necesitan grandes conjuntos de datos etiquetados.

### **Aprendizaje no supervisado**
El aprendizaje no supervisado es el entrenamiento de modelos de datos sin procesar y sin etiquetar. Como el nombre indica, el aprendizaje automático no supervisado no necesita tanta intervención humana comparado con el aprendizaje supervisado. Una persona tiene que establecer los parámetros del modelo, como la cantidad de puntos de clúster, pero el modelo es capaz de procesar grandes conjuntos de datos de manera efectiva y sin supervisión humana.

In [None]:
 #Se instala y configura el entorno necesario para usar PySpark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
# Se establece la ruta de Java (JAVA_HOME).
# Se establece la ruta de Spark (SPARK_HOME), que debe coincidir con la carpeta descomprimida.
# Inicializamos findspark para que Python pueda encontrar e iniciar Spark.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"  # Esta carpeta aparece solo si descomprimes bien
import findspark
findspark.init()

In [None]:
!ls

sample_data  spark-3.1.1-bin-hadoop3.2	spark-3.1.1-bin-hadoop3.2.tgz


In [None]:
# Se importan las librerías necesarias:
# - findspark: para inicializar Spark en el entorno Python.
# - SparkSession: para crear la sesión principal de trabajo en PySpark.
# - functions (como F): para aplicar funciones sobre columnas en DataFrames.
# - pandas y display: para mostrar datos de forma tabular si se necesita.
# - funciones adicionales (col, when, count, isnan, year, month, datediff, lit, floor, concat_ws):
#   para manipular y transformar columnas en los DataFrames.
import findspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pandas as pd
from IPython.display import display
from pyspark.sql.functions import col, when, count, isnan
from pyspark.sql.functions import col, year, month, when, datediff, lit, floor, concat_ws

spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [41]:
# Carga el archivo CSV 'mxmortality_rev.csv' desde Google Drive como un DataFrame de PySpark:
# - option("header", True): indica que el archivo tiene encabezados.
# - option("inferSchema", True): permite que PySpark detecte automáticamente los tipos de datos.
df = spark.read.option("header", True).option("inferSchema", True).csv("/content/drive/MyDrive/Análisis de grandes volúmenes de datos (Gpo 10)/mxmortality_rev.csv")

# Verificar esquema y primeros registros
df.printSchema()
# Mostrar los primeros 5 registros en formato tabular (Pandas)
df.limit(5).toPandas()

root
 |-- decease_date: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- decease_date_UTC: string (nullable = true)
 |-- decease_date_solar: string (nullable = true)
 |-- decease_date_comp: string (nullable = true)
 |-- tod: double (nullable = true)
 |-- daylength: double (nullable = true)
 |-- gdaylength: double (nullable = true)
 |-- flux: double (nullable = true)
 |-- gflux: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- night: boolean (nullable = true)
 |-- gr_lismex: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- sexo: integer (nullable = true)
 |-- causa_def: string (nullable = true)
 |-- Br: double (nullable = true)
 |-- Bt: double (nullable = true)
 |-- Bp: double (nullable = true)
 |-- gBr: double (nullable = true)
 |-- gBt: double (nullable = true)
 |-- gBp: double (nullable = true)



Unnamed: 0,decease_date,birth_date,decease_date_UTC,decease_date_solar,decease_date_comp,tod,daylength,gdaylength,flux,gflux,...,gr_lismex,desc,sexo,causa_def,Br,Bt,Bp,gBr,gBt,gBp
0,2011-12-27,1929-01-06,2011-12-27 22:35:00,2011-12-27 17:25:31.601437,2011-12-27 16:35:00,16.583333,11.615353,-0.002079,395.35355,278.81348,...,28,Enfermedades isquémicas del corazón,1,I259,-58363.848591,-4463.77706,359.544623,-0.008944,0.00917,0.002491
1,2011-09-29,1939-01-21,2011-09-30 01:55:00,2011-09-29 20:32:47.989676,2011-09-29 19:55:00,19.916667,12.708724,0.021519,0.0,0.0,...,30,Enfermedades cerebrovasculares,1,I619,-58410.788115,-4570.400751,359.993905,-0.008894,0.00916,0.002486
2,2011-09-29,1925-04-25,2011-09-30 02:20:00,2011-09-29 20:58:45.808833,2011-09-29 20:20:00,20.333333,12.709252,0.021184,0.0,0.0,...,33,Otras enfermedades del aparato respiratorio,1,J64X,-58382.690832,-4444.202575,364.914014,-0.008725,0.009187,0.002541
3,2011-08-19,1963-04-25,2011-08-19 23:50:00,2011-08-19 18:42:37.379217,2011-08-19 17:50:00,17.833333,13.595417,0.020641,0.0,0.0,...,10,Tumor maligno de órganos respiratorios e intra...,2,C349,-58391.245202,-4435.137647,367.429356,-0.008618,0.00919,0.002557
4,2012-03-15,1942-03-15,2012-03-15 15:15:00,2012-03-15 10:13:19.530816,2012-03-15 09:15:00,9.25,12.708176,-0.021258,1235.542,-181.98178,...,28,Enfermedades isquémicas del corazón,2,I219,-58346.799407,-4481.063804,354.896123,-0.009103,0.009131,0.002428


In [None]:
df = df.withColumn("anio_deceso", year(col("decease_date")))
df = df.withColumn("mes_deceso", month(col("decease_date")))

# 2. Calcular edad al fallecer
df = df.withColumn("edad_fallecimiento", floor(datediff(col("decease_date"), col("birth_date")) / 365.25))

# 3. Crear grupos de edad (bins)
df = df.withColumn(
    "grupo_edad",
    when(col("edad_fallecimiento") < 18, "Menor de edad")
    .when((col("edad_fallecimiento") >= 18) & (col("edad_fallecimiento") < 30), "18-29")
    .when((col("edad_fallecimiento") >= 30) & (col("edad_fallecimiento") < 45), "30-44")
    .when((col("edad_fallecimiento") >= 45) & (col("edad_fallecimiento") < 60), "45-59")
    .when((col("edad_fallecimiento") >= 60) & (col("edad_fallecimiento") < 75), "60-74")
    .otherwise("75+")
)

# 4. Traducir sexo (1 = Hombre, 2 = Mujer)
df = df.withColumn(
    "sexo_desc",
    when(col("sexo") == 1, "Hombre")
    .when(col("sexo") == 2, "Mujer")
    .otherwise("Desconocido")
)

# Mostrar 5 registros con columnas nuevas
df.select("decease_date", "birth_date", "anio_deceso", "mes_deceso", "edad_fallecimiento", "grupo_edad", "sexo", "sexo_desc").limit(5).toPandas()

Unnamed: 0,decease_date,birth_date,anio_deceso,mes_deceso,edad_fallecimiento,grupo_edad,sexo,sexo_desc
0,2011-12-27,1929-01-06,2011,12,82,75+,1,Hombre
1,2011-09-29,1939-01-21,2011,9,72,60-74,1,Hombre
2,2011-09-29,1925-04-25,2011,9,86,75+,1,Hombre
3,2011-08-19,1963-04-25,2011,8,48,45-59,2,Mujer
4,2012-03-15,1942-03-15,2012,3,70,60-74,2,Mujer


In [None]:
# 1. Calcular combinaciones únicas y conteo
combinaciones = df.groupBy("sexo_desc", "grupo_edad").count()

# 2. Calcular proporción total
total = df.count()
combinaciones = combinaciones.withColumn("proporcion", col("count") / total)

# 3. Mostrar las combinaciones
combinaciones.orderBy("sexo_desc", "grupo_edad").toPandas()


Unnamed: 0,sexo_desc,grupo_edad,count,proporcion
0,Hombre,18-29,241070,0.031809
1,Hombre,30-44,433762,0.057234
2,Hombre,45-59,789234,0.104138
3,Hombre,60-74,1151546,0.151944
4,Hombre,75+,1411723,0.186274
5,Hombre,Menor de edad,214917,0.028358
6,Mujer,18-29,76993,0.010159
7,Mujer,30-44,182593,0.024093
8,Mujer,45-59,493396,0.065103
9,Mujer,60-74,899606,0.118701


In [None]:
# Ejemplo: Filtrar partición Mujer + 60-74
muestra_mujer_60_74 = df.filter(
    (col("sexo_desc") == "Mujer") & (col("grupo_edad") == "60-74")
)

# Mostrar algunas filas de esa muestra
muestra_mujer_60_74.select("decease_date", "edad_fallecimiento", "grupo_edad", "sexo_desc").limit(5).toPandas()



Unnamed: 0,decease_date,edad_fallecimiento,grupo_edad,sexo_desc
0,2012-03-15,70,60-74,Mujer
1,2011-12-05,67,60-74,Mujer
2,2011-08-04,71,60-74,Mujer
3,2012-07-26,67,60-74,Mujer
4,2011-07-09,62,60-74,Mujer


In [None]:
dfm = muestra_mujer_60_74.select(
    "decease_date", "birth_date", "anio_deceso",
    "mes_deceso", "edad_fallecimiento",
    "grupo_edad", "sexo", "sexo_desc"
)

dfm.show(5)

+------------+----------+-----------+----------+------------------+----------+----+---------+
|decease_date|birth_date|anio_deceso|mes_deceso|edad_fallecimiento|grupo_edad|sexo|sexo_desc|
+------------+----------+-----------+----------+------------------+----------+----+---------+
|  2012-03-15|1942-03-15|       2012|         3|                70|     60-74|   2|    Mujer|
|  2011-12-05|1944-08-04|       2011|        12|                67|     60-74|   2|    Mujer|
|  2011-08-04|1940-07-15|       2011|         8|                71|     60-74|   2|    Mujer|
|  2012-07-26|1945-06-02|       2012|         7|                67|     60-74|   2|    Mujer|
|  2011-07-09|1949-04-16|       2011|         7|                62|     60-74|   2|    Mujer|
+------------+----------+-----------+----------+------------------+----------+----+---------+
only showing top 5 rows



In [None]:
dfm.show(5)
dfm.printSchema()

+------------+----------+-----------+----------+------------------+----------+----+---------+
|decease_date|birth_date|anio_deceso|mes_deceso|edad_fallecimiento|grupo_edad|sexo|sexo_desc|
+------------+----------+-----------+----------+------------------+----------+----+---------+
|  2012-03-15|1942-03-15|       2012|         3|                70|     60-74|   2|    Mujer|
|  2011-12-05|1944-08-04|       2011|        12|                67|     60-74|   2|    Mujer|
|  2011-08-04|1940-07-15|       2011|         8|                71|     60-74|   2|    Mujer|
|  2012-07-26|1945-06-02|       2012|         7|                67|     60-74|   2|    Mujer|
|  2011-07-09|1949-04-16|       2011|         7|                62|     60-74|   2|    Mujer|
+------------+----------+-----------+----------+------------------+----------+----+---------+
only showing top 5 rows

root
 |-- decease_date: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- anio_deceso: integer (nullable = 

In [None]:
# Limpieza básica de datos
#Se eliminan registros con valores nulos
df_clean = dfm.dropna()

#Se eliminan registros duplicados
df_clean= df_clean.dropDuplicates()

df_clean.explain()
df_clean.show(5)

print("Número de registros: " + str(df_clean.count()))
print("Número de columnas: " + str(len(df_clean.columns)))

== Physical Plan ==
*(2) HashAggregate(keys=[grupo_edad#163, anio_deceso#85, sexo_desc#191, edad_fallecimiento#136L, mes_deceso#110, sexo#31, decease_date#16, birth_date#17], functions=[])
+- Exchange hashpartitioning(grupo_edad#163, anio_deceso#85, sexo_desc#191, edad_fallecimiento#136L, mes_deceso#110, sexo#31, decease_date#16, birth_date#17, 200), ENSURE_REQUIREMENTS, [id=#155]
   +- *(1) HashAggregate(keys=[grupo_edad#163, anio_deceso#85, sexo_desc#191, edad_fallecimiento#136L, mes_deceso#110, sexo#31, decease_date#16, birth_date#17], functions=[])
      +- *(1) Project [decease_date#16, birth_date#17, year(cast(decease_date#16 as date)) AS anio_deceso#85, month(cast(decease_date#16 as date)) AS mes_deceso#110, FLOOR((cast(datediff(cast(decease_date#16 as date), cast(birth_date#17 as date)) as double) / 365.25)) AS edad_fallecimiento#136L, CASE WHEN (FLOOR((cast(datediff(cast(decease_date#16 as date), cast(birth_date#17 as date)) as double) / 365.25)) < 18) THEN Menor de edad WHEN 

In [None]:
# Particionamiento típico
# Se define el valor por default del número de ejecutores
spark.conf.set("spark.sql.shuffle.partitions", "200")
train_data,test_data = df_clean.randomSplit([0.8,0.2], seed = 42)
print(f"""Existen {train_data.count()} instancias en el conjunto train, y {test_data.count()} en el conjunto test""")

Existen 701104 instancias en el conjunto train, y 175336 en el conjunto test


In [None]:
# prompt: imprimir el plan de transformación al llamar randomSplit

train_data.explain()
test_data.explain()


== Physical Plan ==
*(2) Sample 0.0, 0.8, false, 42
+- *(2) Sort [decease_date#16 ASC NULLS FIRST, birth_date#17 ASC NULLS FIRST, anio_deceso#85 ASC NULLS FIRST, mes_deceso#110 ASC NULLS FIRST, edad_fallecimiento#136L ASC NULLS FIRST, grupo_edad#163 ASC NULLS FIRST, sexo#31 ASC NULLS FIRST, sexo_desc#191 ASC NULLS FIRST], false, 0
   +- *(2) HashAggregate(keys=[grupo_edad#163, anio_deceso#85, sexo_desc#191, edad_fallecimiento#136L, mes_deceso#110, sexo#31, decease_date#16, birth_date#17], functions=[])
      +- Exchange hashpartitioning(grupo_edad#163, anio_deceso#85, sexo_desc#191, edad_fallecimiento#136L, mes_deceso#110, sexo#31, decease_date#16, birth_date#17, 200), ENSURE_REQUIREMENTS, [id=#379]
         +- *(1) HashAggregate(keys=[grupo_edad#163, anio_deceso#85, sexo_desc#191, edad_fallecimiento#136L, mes_deceso#110, sexo#31, decease_date#16, birth_date#17], functions=[])
            +- *(1) Project [decease_date#16, birth_date#17, year(cast(decease_date#16 as date)) AS anio_deces

In [None]:
# Se imprime algunos registros del train y test generado
train_data.show(5)
test_data.show(5)

+------------+----------+-----------+----------+------------------+----------+----+---------+
|decease_date|birth_date|anio_deceso|mes_deceso|edad_fallecimiento|grupo_edad|sexo|sexo_desc|
+------------+----------+-----------+----------+------------------+----------+----+---------+
|  2000-07-19|1940-02-22|       2000|         7|                60|     60-74|   2|    Mujer|
|  2001-08-06|1935-06-21|       2001|         8|                66|     60-74|   2|    Mujer|
|  2005-11-06|1935-01-25|       2005|        11|                70|     60-74|   2|    Mujer|
|  2011-09-03|1947-01-18|       2011|         9|                64|     60-74|   2|    Mujer|
|  2011-11-19|1949-01-20|       2011|        11|                62|     60-74|   2|    Mujer|
+------------+----------+-----------+----------+------------------+----------+----+---------+
only showing top 5 rows

+------------+----------+-----------+----------+------------------+----------+----+---------+
|decease_date|birth_date|anio_deces

In [None]:
# Se define el valor por default del número de ejecutores
spark.conf.set("spark.sql.shuffle.partitions", "1")
train_data,test_data = df_clean.randomSplit([0.8,0.2], seed = 42)
print(f"""Existen {train_data.count()} instancias en el conjunto train, y {test_data.count()} en el conjunto test""")

Existen 701609 instancias en el conjunto train, y 174831 en el conjunto test


In [17]:
# Se imprime algunos registros del train y test generado
train_data.show(5)
test_data.show(5)

+------------+----------+-----------+----------+------------------+----------+----+---------+
|decease_date|birth_date|anio_deceso|mes_deceso|edad_fallecimiento|grupo_edad|sexo|sexo_desc|
+------------+----------+-----------+----------+------------------+----------+----+---------+
|  1970-08-11|1908-12-29|       1970|         8|                61|     60-74|   2|    Mujer|
|  1976-07-12|1913-08-13|       1976|         7|                62|     60-74|   2|    Mujer|
|  1982-12-28|1914-06-14|       1982|        12|                68|     60-74|   2|    Mujer|
|  1983-09-21|1918-01-13|       1983|         9|                65|     60-74|   2|    Mujer|
|  1983-12-01|1915-12-23|       1983|        12|                67|     60-74|   2|    Mujer|
+------------+----------+-----------+----------+------------------+----------+----+---------+
only showing top 5 rows

+------------+----------+-----------+----------+------------------+----------+----+---------+
|decease_date|birth_date|anio_deces

In [18]:
print("Número de registros: " + str(df_clean.count()))
print("Número de columnas: " + str(len(df_clean.columns)))

Número de registros: 876440
Número de columnas: 8


In [19]:
from pyspark.sql.functions import col
from pyspark.ml.feature import QuantileDiscretizer

# Se calcula el valor de discretización a partir del número de bins. El resultado se almacena en "medv_binned"
num_bins = 2
quantile_discretizer = QuantileDiscretizer(numBuckets=num_bins, inputCol="edad_fallecimiento", outputCol="ef_bin")

# Aplicar transformación
df_clean_bin = quantile_discretizer.fit(df_clean).transform(df_clean)

df_clean_bin.show()
print("Número de registros: " + str(df_clean_bin.count()))
print("Número de columnas: " + str(len(df_clean_bin.columns)))

+------------+----------+-----------+----------+------------------+----------+----+---------+------+
|decease_date|birth_date|anio_deceso|mes_deceso|edad_fallecimiento|grupo_edad|sexo|sexo_desc|ef_bin|
+------------+----------+-----------+----------+------------------+----------+----+---------+------+
|  2012-03-15|1942-03-15|       2012|         3|                70|     60-74|   2|    Mujer|   1.0|
|  2011-12-05|1944-08-04|       2011|        12|                67|     60-74|   2|    Mujer|   0.0|
|  2011-08-04|1940-07-15|       2011|         8|                71|     60-74|   2|    Mujer|   1.0|
|  2012-07-26|1945-06-02|       2012|         7|                67|     60-74|   2|    Mujer|   0.0|
|  2011-07-09|1949-04-16|       2011|         7|                62|     60-74|   2|    Mujer|   0.0|
|  2011-12-29|1945-03-01|       2011|        12|                66|     60-74|   2|    Mujer|   0.0|
|  2011-12-22|1946-03-22|       2011|        12|                65|     60-74|   2|    Muje

In [20]:
# Se obtiene el número de instancias por bin
stratum_counts = df_clean_bin.groupBy("ef_bin").count().collect()
total_count = df_clean_bin.count()

print(stratum_counts)
print(total_count)

[Row(ef_bin=1.0, count=444303), Row(ef_bin=0.0, count=432137)]
876440


In [21]:
# Se calcula la probabilidad del test de cada bin de acuerdo al porcentaje de division a usar (70 - 30)

stratum_fractions = {row["ef_bin"]: 0.3 * (row["count"] / total_count)
                     for row in stratum_counts}

print(stratum_fractions)

{1.0: 0.152082173337593, 0.0: 0.147917826662407}


In [33]:
# Se generan los conjuntos a partir de muestreo estratificado
from pyspark.ml.feature import StringIndexer, VectorAssembler

indexer = StringIndexer(inputCol="grupo_edad", outputCol="grupo_edad_index")
df_indexed = indexer.fit(df_clean_bin).transform(df_clean_bin)

test_data = df_indexed.sampleBy("ef_bin", fractions=stratum_fractions, seed=42)
train_data = df_indexed.exceptAll(test_data)

print(f"""Existen {train_data.count()} instancias en el conjunto train, y {test_data.count()} en el conjunto test""")

train_data.show(5)
test_data.show(5)

Existen 744533 instancias en el conjunto train, y 131907 en el conjunto test
+------------+----------+-----------+----------+------------------+----------+----+---------+------+----------------+
|decease_date|birth_date|anio_deceso|mes_deceso|edad_fallecimiento|grupo_edad|sexo|sexo_desc|ef_bin|grupo_edad_index|
+------------+----------+-----------+----------+------------------+----------+----+---------+------+----------------+
|  2012-03-15|1942-03-15|       2012|         3|                70|     60-74|   2|    Mujer|   1.0|             0.0|
|  2011-12-05|1944-08-04|       2011|        12|                67|     60-74|   2|    Mujer|   0.0|             0.0|
|  2011-08-04|1940-07-15|       2011|         8|                71|     60-74|   2|    Mujer|   1.0|             0.0|
|  2012-07-26|1945-06-02|       2012|         7|                67|     60-74|   2|    Mujer|   0.0|             0.0|
|  2011-07-09|1949-04-16|       2011|         7|                62|     60-74|   2|    Mujer|   0

In [34]:
assembler = VectorAssembler( inputCols=['anio_deceso', 'mes_deceso', 'sexo', 'grupo_edad_index'], outputCol = 'Attributes')
output_train = assembler.transform(train_data)
output_test = assembler.transform(test_data)

output_train.show(5)
output_test.show(5)

+------------+----------+-----------+----------+------------------+----------+----+---------+------+----------------+--------------------+
|decease_date|birth_date|anio_deceso|mes_deceso|edad_fallecimiento|grupo_edad|sexo|sexo_desc|ef_bin|grupo_edad_index|          Attributes|
+------------+----------+-----------+----------+------------------+----------+----+---------+------+----------------+--------------------+
|  2012-03-15|1942-03-15|       2012|         3|                70|     60-74|   2|    Mujer|   1.0|             0.0|[2012.0,3.0,2.0,0.0]|
|  2011-12-05|1944-08-04|       2011|        12|                67|     60-74|   2|    Mujer|   0.0|             0.0|[2011.0,12.0,2.0,...|
|  2011-08-04|1940-07-15|       2011|         8|                71|     60-74|   2|    Mujer|   1.0|             0.0|[2011.0,8.0,2.0,0.0]|
|  2012-07-26|1945-06-02|       2012|         7|                67|     60-74|   2|    Mujer|   0.0|             0.0|[2012.0,7.0,2.0,0.0]|
|  2011-07-09|1949-04-16|  

In [35]:
# Se manda a entrenar con un modelo de regresion Lineal
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'Attributes', labelCol = 'edad_fallecimiento', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(output_train)

In [36]:
# Se Imprimen los valores de los coeficientes
print ("The coefficient of the model is : ", lr_model.coefficients)
print ("The Intercept of the model is : ", lr_model.intercept)

The coefficient of the model is :  [0.0,0.0,0.0,0.0]
The Intercept of the model is :  67.405444755303


In [37]:
# Se aplica el modelo al conjunto test
Pred_lr = lr_model.evaluate(output_test)

In [38]:
# Evaluación del modelo
from pyspark.ml.evaluation import RegressionEvaluator

#Root Mean Square Error
eval_lr = RegressionEvaluator(labelCol="edad_fallecimiento", predictionCol="prediction", metricName="rmse")
rmse_lr = eval_lr.evaluate(Pred_lr.predictions)
print("RMSE: %.3f" % rmse_lr)

# Mean Square Error
mse = eval_lr.evaluate(Pred_lr.predictions, {eval_lr.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval_lr.evaluate(Pred_lr.predictions, {eval_lr.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval_lr.evaluate(Pred_lr.predictions, {eval_lr.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 4.287
MSE: 18.378
MAE: 3.712
r2: -0.000


In [39]:

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Entrenar modelo KMeans
kmeans = KMeans(featuresCol='Attributes', k=3, seed=42)  # puedes ajustar k a lo que quieras explorar
kmeans_model = kmeans.fit(output_train)

# Aplicar modelo al conjunto de entrenamiento
kmeans_predictions = kmeans_model.transform(output_train)

# Ver los clusters asignados
kmeans_predictions.select("prediction", "anio_deceso", "mes_deceso", "sexo", "grupo_edad_index").show(10)

# Evaluar clustering usando silhouette score
evaluator = ClusteringEvaluator(featuresCol='Attributes', metricName='silhouette', distanceMeasure='squaredEuclidean')
silhouette = evaluator.evaluate(kmeans_predictions)
print(f"Silhouette with squared Euclidean distance = {silhouette:.3f}")

# Ver los centros de los clusters
centers = kmeans_model.clusterCenters()
print("Cluster Centers:")
for idx, center in enumerate(centers):
    print(f"Cluster {idx}: {center}")

+----------+-----------+----------+----+----------------+
|prediction|anio_deceso|mes_deceso|sexo|grupo_edad_index|
+----------+-----------+----------+----+----------------+
|         2|       2012|         3|   2|             0.0|
|         1|       2011|        12|   2|             0.0|
|         1|       2011|         8|   2|             0.0|
|         1|       2012|         7|   2|             0.0|
|         1|       2011|         7|   2|             0.0|
|         1|       2011|        12|   2|             0.0|
|         1|       2011|        12|   2|             0.0|
|         1|       2011|        12|   2|             0.0|
|         1|       2011|        12|   2|             0.0|
|         1|       2011|        12|   2|             0.0|
+----------+-----------+----------+----+----------------+
only showing top 10 rows

Silhouette with squared Euclidean distance = 0.594
Cluster Centers:
Cluster 0: [2.02018688e+03 9.17095723e+00 2.00000000e+00 0.00000000e+00]
Cluster 1: [2.0146013

# **Conclusiones**
Después de la limpieza, se trabajó con un número reducido de registros mediante muestreo estratificado para evitar tiempos de procesamiento elevados.  
Se generaron variables derivadas como `anio_deceso`, `mes_deceso`, `edad_fallecimiento`, y se categorizaron variables como `grupo_edad` y `ef_bin`.

**Aprendizaje supervisado (Regresión lineal):**  
El modelo buscó predecir la edad de fallecimiento usando año, mes y sexo como predictores.  Las métricas (RMSE, MSE, MAE, R²) indicaron errores considerables y baja capacidad predictiva, probablemente porque las variables disponibles no capturan suficiente variabilidad.  
Enriquecer el dataset con variables adicionales como causas de muerte, comorbilidades o factores socioeconómicos para mejorar las predicciones.

**Aprendizaje no supervisado (KMeans):**  
Se aplicó KMeans con k=3 para agrupar los datos.  
El Silhouette Score (métrica de evaluación del clustering) permitió valorar la separación de los clusters, aunque los resultados sugieren que se necesitan variables más diferenciadoras para lograr grupos más significativos.  

Esta actividad permitió practicar un pipeline completo de machine learning en PySpark, desde la preparación de datos hasta la evaluación de modelos, identificando retos y oportunidades de mejora para futuros análisis.