# Proyecto GEIH 2024 - ETL en PySpark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("GEIH_ETL_2024").getOrCreate()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2,application_1749523038563_0003,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Funciones para unión segura y procesamiento mensual

In [2]:
def safe_join(df1, df2, claves, prefix):
    renamed = df2.select([col(c).alias(f"{prefix.lower()}_{c.lower()}") if c not in claves else col(c) for c in df2.columns])
    return df1.join(renamed, on=claves, how="left")

def procesar_mes(mes, modulos):
    ruta_mes = f"s3://geih-datalake-2024/raw/{mes}/"
    datos = {}
    for modulo, claves in modulos.items():
        try:
            df = spark.read.option("header", True).option("sep", ";").option("encoding", "latin1").csv(ruta_mes + f"{modulo}.CSV")
            datos[modulo] = df
            print(f"[{mes}] Módulo {modulo} cargado.")
        except Exception as e:
            print(f"[{mes}] Error módulo {modulo}: {e}")

    df_base = datos["10"]
    for cod in ["50", "60", "70", "80", "90", "94"]:
        if cod in datos:
            df_base = safe_join(df_base, datos[cod], ["DIRECTORIO", "SECUENCIA_P", "ORDEN"], cod)
    df_base = safe_join(df_base, datos["01"], ["DIRECTORIO", "SECUENCIA_P"], "01")
    df_base = df_base.select([col(c).alias(c.lower().strip().replace(" ", "_")) for c in df_base.columns])
    df_base = df_base.dropna(subset=["directorio", "secuencia_p", "orden"])
    ruta_salida = f"s3://geih-datalake-2024/trusted/{mes}/"
    df_base.write.mode("overwrite").parquet(ruta_salida)
    print(f"Datos de {mes} guardados en zona trusted.")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Ejecución del procesamiento para todos los meses

In [3]:
modulos = {
    "01": ["DIRECTORIO", "SECUENCIA_P"],
    "10": ["DIRECTORIO", "SECUENCIA_P", "ORDEN"],
    "50": ["DIRECTORIO", "SECUENCIA_P", "ORDEN"],
    "60": ["DIRECTORIO", "SECUENCIA_P", "ORDEN"],
    "70": ["DIRECTORIO", "SECUENCIA_P", "ORDEN"],
    "80": ["DIRECTORIO", "SECUENCIA_P", "ORDEN"],
    "90": ["DIRECTORIO", "SECUENCIA_P", "ORDEN"],
    "94": ["DIRECTORIO", "SECUENCIA_P", "ORDEN"]
}

meses = ["enero", "febrero", "marzo", "abril", "mayo", "junio",
         "julio", "agosto", "septiembre", "octubre", "noviembre", "diciembre"]

for mes in meses:
    procesar_mes(mes, modulos)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[enero] M?dulo 01 cargado.
[enero] M?dulo 10 cargado.
[enero] M?dulo 50 cargado.
[enero] M?dulo 60 cargado.
[enero] M?dulo 70 cargado.
[enero] M?dulo 80 cargado.
[enero] M?dulo 90 cargado.
[enero] M?dulo 94 cargado.
Datos de enero guardados en zona trusted.
[febrero] M?dulo 01 cargado.
[febrero] M?dulo 10 cargado.
[febrero] M?dulo 50 cargado.
[febrero] M?dulo 60 cargado.
[febrero] M?dulo 70 cargado.
[febrero] M?dulo 80 cargado.
[febrero] M?dulo 90 cargado.
[febrero] M?dulo 94 cargado.
Datos de febrero guardados en zona trusted.
[marzo] M?dulo 01 cargado.
[marzo] M?dulo 10 cargado.
[marzo] M?dulo 50 cargado.
[marzo] M?dulo 60 cargado.
[marzo] M?dulo 70 cargado.
[marzo] M?dulo 80 cargado.
[marzo] M?dulo 90 cargado.
[marzo] M?dulo 94 cargado.
Datos de marzo guardados en zona trusted.
[abril] M?dulo 01 cargado.
[abril] M?dulo 10 cargado.
[abril] M?dulo 50 cargado.
[abril] M?dulo 60 cargado.
[abril] M?dulo 70 cargado.
[abril] M?dulo 80 cargado.
[abril] M?dulo 90 cargado.
[abril] M?dulo 94 c

## Consolidación de todos los meses

In [4]:
df_geih_2024 = spark.read.parquet("s3://geih-datalake-2024/trusted/*/")
df_geih_2024.write.mode("overwrite").parquet("s3://geih-datalake-2024/trusted/geih2024_full/")
df_geih_2024.printSchema()

print("Consolidado anual guardado en zona trusted.")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- directorio: string (nullable = true)
 |-- secuencia_p: string (nullable = true)
 |-- orden: string (nullable = true)
 |-- periodo: string (nullable = true)
 |-- mes: string (nullable = true)
 |-- per: string (nullable = true)
 |-- hogar: string (nullable = true)
 |-- regis: string (nullable = true)
 |-- area: string (nullable = true)
 |-- clase: string (nullable = true)
 |-- fex_c18: string (nullable = true)
 |-- dpto: string (nullable = true)
 |-- pt: string (nullable = true)
 |-- p6016: string (nullable = true)
 |-- p3271: string (nullable = true)
 |-- p6040: string (nullable = true)
 |-- p6030s1: string (nullable = true)
 |-- p6030s3: string (nullable = true)
 |-- p6050: string (nullable = true)
 |-- p6083: string (nullable = true)
 |-- p6083s1: string (nullable = true)
 |-- p6081: string (nullable = true)
 |-- p6081s1: string (nullable = true)
 |-- p2057: string (nullable = true)
 |-- p2059: string (nullable = true)
 |-- p2061: string (nullable = true)
 |-- p6080: string 

## Eliminación de columnas con más del 40% de nulos

In [5]:
from pyspark.sql.functions import col, count, when, isnan

# Conteo de nulos por columna
total = df_geih_2024.count()
nulos = df_geih_2024.select([
    (count(when(col(c).isNull() | isnan(c), c)) / total).alias(c)
    for c in df_geih_2024.columns
])
nulos.show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----------+-----+-------+---+---+-----+-----+------------------+-----+-------+----+---+-----+-----+-----+--------------------+--------------------+-----+-----+------------------+-----+-----------------+-----+-------------------+-----+-----+------------------+---------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+-----+-------------------+------------------+------------------+-------+-------+-------+-------+-------+-------+-------+-------+-------------------+-------------------+------------------+-------------------+-------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+------------------+--------------

In [7]:
nulos_row = nulos.collect()[0].asDict()
columnas_a_eliminar = [col for col, prop in nulos_row.items() if prop > 0.4]

print("Columnas a eliminar (más del 40% nulos):")
print(columnas_a_eliminar)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Columnas a eliminar (m?s del 40% nulos):
['p6083s1', 'p6081s1', 'p6080s1', 'p6080s1a1', 'p6071', 'p6071s1', 'p3147s1', 'p3147s2', 'p3147s3', 'p3147s4', 'p3147s5', 'p3147s6', 'p3147s7', 'p3147s8', 'p3147s9', 'p3147s10', 'p3147s11', 'p3147s10a1', 'p6110', 'p6120', 'p3041', 'p3042s2', 'p3043', 'p3043s1', '50_area', '50_ft', '50_fft', '50_p6240s1', '50_p6240s2', '50_p6250', '50_p6260', '50_p6260s1', '50_p6260s1a1', '50_p6260s2', '50_p6270', '50_p6280', '50_p6290', '50_p3362s1', '50_p3362s2', '50_p3362s3', '50_p3362s4', '50_p3362s5', '50_p3362s6', '50_p3362s7', '50_p3362s8', '50_p3362s7a1', '50_p6300', '50_p6310', '50_p6310s1', '50_p6320', '50_p6330', '50_p6340', '50_p6350', '50_p6351', '60_periodo', '60_mes', '60_per', '60_hogar', '60_regis', '60_area', '60_clase', '60_fex_c18', '60_dpto', '60_ft', '60_p3044s2', '60_p6440', '60_p6450', '60_p6460', '60_p6460s1', '60_p6400', '60_p6410', '60_p6422', '60_p6420s2', '60_p6424s1', '60_p6424s2', '60_p6424s3', '60_p6424s5', '60_p6426', '60_p6430', 

In [8]:
df_limpio = df_geih_2024.drop(*columnas_a_eliminar)

# Validar resultados
print("Columnas iniciales:", len(df_geih_2024.columns))
print("Columnas eliminadas:", len(columnas_a_eliminar))
print("Columnas finales:", len(df_limpio.columns))
df_limpio.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Columnas iniciales: 589
Columnas eliminadas: 430
Columnas finales: 159
root
 |-- directorio: string (nullable = true)
 |-- secuencia_p: string (nullable = true)
 |-- orden: string (nullable = true)
 |-- periodo: string (nullable = true)
 |-- mes: string (nullable = true)
 |-- per: string (nullable = true)
 |-- hogar: string (nullable = true)
 |-- regis: string (nullable = true)
 |-- area: string (nullable = true)
 |-- clase: string (nullable = true)
 |-- fex_c18: string (nullable = true)
 |-- dpto: string (nullable = true)
 |-- pt: string (nullable = true)
 |-- p6016: string (nullable = true)
 |-- p3271: string (nullable = true)
 |-- p6040: string (nullable = true)
 |-- p6030s1: string (nullable = true)
 |-- p6030s3: string (nullable = true)
 |-- p6050: string (nullable = true)
 |-- p6083: string (nullable = true)
 |-- p6081: string (nullable = true)
 |-- p2057: string (nullable = true)
 |-- p2059: string (nullable = true)
 |-- p2061: string (nullable = true)
 |-- p6080: string (nullab

## Columnas adicionales a eliminar

In [9]:
# Lista de columnas a eliminar
columnas_del = [
    "p6016", "p6030s1", "p6030s3", "p6050", "p2059", "p2061",
    "p1906s1", "p1906s2", "p1906s3", "p1906s4", "p1906s5", "p1906s6", "p1906s7", "p1906s8",
    "p3042s1", "p3038", "p3039", "pob_may18",
    "80_p3076s1", "80_p3076s2", "80_p3076s3", "80_p3077s1", "80_p3077s2", "80_p3077s3",
    "80_p3078s1", "80_p3078s2", "80_p3078s3", "80_p3079s1", "80_p3079s2", "80_p3079s3",
    "80_p3081s1", "80_p3081s2", "80_p3081s3", "80_p3082s1", "80_p3082s2", "80_p3082s3",
    "80_p3089", "80_p3091", "80_p3093", "80_p3094", "80_p3095", "80_p3096", "80_p3098", "80_p3099", "80_p3101",
    "90_p3370", "90_p3371", "90_p3372",
    "94_p3373", "94_p3373s1", "94_p3373s2", "94_p3373s3", "94_p3376", 
    "94_p3382", "94_p3382s1", "94_p3382s2", "94_p3382s3", "94_p3383",
    "94_p3384", "94_p3384s1", "94_p3384s2", "94_p3384s3", "94_p3385",
    "01_p4000", "01_p4010", "01_p4020", "01_p4030s1", "01_p4030s1a1", "01_p4030s2", 
    "01_p4030s3", "01_p4030s4", "01_p4030s4a1", "01_p4030s5",
    "01_p70", "01_p5000", "01_p5010", "01_p5020", "01_p5040", "01_p5050", 
    "01_p5070", "01_p5080", "01_p5090", "01_p5130", "01_p5030"
]

# Eliminar las columnas del DataFrame
df_limpio = df_limpio.drop(*columnas_del)

# Confirmar cantidad de columnas después de eliminación
print(f"Columnas después de eliminación: {len(df_limpio.columns)}")
df_limpio.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Columnas despu?s de eliminaci?n: 75
root
 |-- directorio: string (nullable = true)
 |-- secuencia_p: string (nullable = true)
 |-- orden: string (nullable = true)
 |-- periodo: string (nullable = true)
 |-- mes: string (nullable = true)
 |-- per: string (nullable = true)
 |-- hogar: string (nullable = true)
 |-- regis: string (nullable = true)
 |-- area: string (nullable = true)
 |-- clase: string (nullable = true)
 |-- fex_c18: string (nullable = true)
 |-- dpto: string (nullable = true)
 |-- pt: string (nullable = true)
 |-- p3271: string (nullable = true)
 |-- p6040: string (nullable = true)
 |-- p6083: string (nullable = true)
 |-- p6081: string (nullable = true)
 |-- p2057: string (nullable = true)
 |-- p6080: string (nullable = true)
 |-- p6070: string (nullable = true)
 |-- p6090: string (nullable = true)
 |-- p6100: string (nullable = true)
 |-- p6160: string (nullable = true)
 |-- p6170: string (nullable = true)
 |-- p3042: string (nullable = true)
 |-- 50_periodo: string (nul

In [10]:
# Listas de columnas redundantes por módulo
columnas_redundantes = [
    # periodo, mes, per
    "50_periodo", "80_periodo", "90_periodo", "94_periodo", "01_periodo",
    "50_mes", "80_mes", "90_mes", "94_mes", "01_mes",
    "50_per", "80_per", "90_per", "94_per", "01_per",
    
    # hogar, regis, clase, dpto, fex_c18
    "50_hogar", "80_hogar", "90_hogar", "94_hogar", "01_hogar",
    "50_regis", "80_regis", "90_regis", "94_regis", "01_regis",
    "50_clase", "80_clase", "90_clase", "94_clase", "01_clase",
    "50_dpto", "80_dpto", "90_dpto", "94_dpto", "01_dpto",
    "50_fex_c18", "80_fex_c18", "90_fex_c18", "94_fex_c18", "01_fex_c18",
    
    # otros duplicados por módulo
    "94_area", "01_area"
]

# Eliminar columnas redundantes
df_depurado = df_limpio.drop(*columnas_redundantes)

# Confirmar estructura
print(f"Total de columnas después de consolidar variables duplicadas: {len(df_depurado.columns)}")
df_depurado.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total de columnas despu?s de consolidar variables duplicadas: 33
root
 |-- directorio: string (nullable = true)
 |-- secuencia_p: string (nullable = true)
 |-- orden: string (nullable = true)
 |-- periodo: string (nullable = true)
 |-- mes: string (nullable = true)
 |-- per: string (nullable = true)
 |-- hogar: string (nullable = true)
 |-- regis: string (nullable = true)
 |-- area: string (nullable = true)
 |-- clase: string (nullable = true)
 |-- fex_c18: string (nullable = true)
 |-- dpto: string (nullable = true)
 |-- pt: string (nullable = true)
 |-- p3271: string (nullable = true)
 |-- p6040: string (nullable = true)
 |-- p6083: string (nullable = true)
 |-- p6081: string (nullable = true)
 |-- p2057: string (nullable = true)
 |-- p6080: string (nullable = true)
 |-- p6070: string (nullable = true)
 |-- p6090: string (nullable = true)
 |-- p6100: string (nullable = true)
 |-- p6160: string (nullable = true)
 |-- p6170: string (nullable = true)
 |-- p3042: string (nullable = true)

## Imputación de datos y transformaciones

In [11]:
from pyspark.sql.functions import col, count, when, isnan, lit

# Total de filas del DataFrame
total_filas = df_depurado.count()

# Crear lista con proporción de nulos por columna
nulos_df = df_depurado.select([
    (count(when(col(c).isNull() | isnan(c) | (col(c) == ''), c)) / total_filas).alias(c)
    for c in df_depurado.columns
])

# Transponer el DataFrame de nulos para visualizarlo mejor
nulos_por_columna = (
    nulos_df.selectExpr("stack(" + str(len(nulos_df.columns)) + "," +
                        ",".join([f"'{c}', `{c}`" for c in nulos_df.columns]) + ") as (columna, proporcion_nulos)")
    .orderBy(col("proporcion_nulos").desc())
)

# Mostrar resultado
nulos_por_columna.show(n=50, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+-------------------+
|columna    |proporcion_nulos   |
+-----------+-------------------+
|01_p5222s2 |0.32836758135336025|
|area       |0.2788703637413325 |
|50_pet     |0.2071634588149932 |
|50_p6240   |0.2071634588149932 |
|90_p7495   |0.2071634588149932 |
|90_p7505   |0.2071634588149932 |
|90_p3367   |0.2071634588149932 |
|p6070      |0.1289034486665389 |
|p6100      |0.03877384495042082|
|p6160      |0.03148069804973707|
|p6170      |0.03148069804973707|
|p3042      |0.03148069804973707|
|directorio |0.0                |
|secuencia_p|0.0                |
|orden      |0.0                |
|periodo    |0.0                |
|mes        |0.0                |
|per        |0.0                |
|hogar      |0.0                |
|regis      |0.0                |
|clase      |0.0                |
|fex_c18    |0.0                |
|dpto       |0.0                |
|pt         |0.0                |
|p3271      |0.0                |
|p6040      |0.0                |
|p6083      |0

In [12]:
from pyspark.sql.functions import col, when, count, isnan, lit, desc

# Variables binarias codificadas como 1 (Sí) y 2 (No)
variables_binarias = [
    "p6083", "p6081", "p2057", "p6090", "p6160", "p6170", "01_p5222s2", "90_p7495",
    "90_p7505", "90_p3367", "p3271"
]

# Imputar nulos en variables binarias con 2 (No)
for col_name in variables_binarias:
    df_depurado = df_depurado.withColumn(
        col_name,
        when(col(col_name).isNull(), lit("2")).otherwise(col(col_name))
    )

# Variables categóricas para imputación con la moda
variables_categoricas = ["p6070", "p6100", "p3042", "94_p3374", "50_p6240"]

# Imputar nulos en variables categóricas con la moda
for var in variables_categoricas:
    moda = (
        df_depurado.groupBy(var)
        .agg(count("*").alias("frecuencia"))
        .orderBy(desc("frecuencia"))
        .filter(col(var).isNotNull())
        .limit(1)
        .collect()[0][0]
    )
    df_depurado = df_depurado.withColumn(
        var,
        when(col(var).isNull(), lit(moda)).otherwise(col(var))
    )

# Verificar proporción de nulos después de imputación
total = df_depurado.count()
proporcion_nulos = df_depurado.select([
    (count(when(col(c).isNull() | isnan(c), c)) / total).alias(c)
    for c in df_depurado.columns
])

proporcion_nulos.show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----------+-----+-------+---+---+-----+-----+------------------+-----+-------+----+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------------------+--------+--------+--------+--------+--------+----------+--------+
|directorio|secuencia_p|orden|periodo|mes|per|hogar|regis|area              |clase|fex_c18|dpto|pt |p3271|p6040|p6083|p6081|p2057|p6080|p6070|p6090|p6100|p6160|p6170|p3042|50_pet            |50_p6240|90_p7495|90_p7505|90_p3367|94_p3374|01_p5222s2|01_p6008|
+----------+-----------+-----+-------+---+---+-----+-----+------------------+-----+-------+----+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------------------+--------+--------+--------+--------+--------+----------+--------+
|0.0       |0.0        |0.0  |0.0    |0.0|0.0|0.0  |0.0  |0.2788703637413325|0.0  |0.0    |0.0 |0.0|0.0  |0.0  |0.0  |0.0  |0.0  |0.0  |0.0  |0.0  |0.0  |0.0  |0.0  |0.0  |0.2071634588149932|0.0     |0.0     |0.0     |0.0     |0.

In [13]:
# Mostrar tipos de datos
df_depurado.printSchema()

# Ver muestra de los datos
df_depurado.show(10, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- directorio: string (nullable = true)
 |-- secuencia_p: string (nullable = true)
 |-- orden: string (nullable = true)
 |-- periodo: string (nullable = true)
 |-- mes: string (nullable = true)
 |-- per: string (nullable = true)
 |-- hogar: string (nullable = true)
 |-- regis: string (nullable = true)
 |-- area: string (nullable = true)
 |-- clase: string (nullable = true)
 |-- fex_c18: string (nullable = true)
 |-- dpto: string (nullable = true)
 |-- pt: string (nullable = true)
 |-- p3271: string (nullable = true)
 |-- p6040: string (nullable = true)
 |-- p6083: string (nullable = true)
 |-- p6081: string (nullable = true)
 |-- p2057: string (nullable = true)
 |-- p6080: string (nullable = true)
 |-- p6070: string (nullable = true)
 |-- p6090: string (nullable = true)
 |-- p6100: string (nullable = true)
 |-- p6160: string (nullable = true)
 |-- p6170: string (nullable = true)
 |-- p3042: string (nullable = true)
 |-- 50_pet: string (nullable = true)
 |-- 50_p6240: string (nul

In [14]:
from pyspark.sql.functions import when, col
from pyspark.sql.types import DoubleType

# Convertir columnas a tipo Double para facilitar el tratamiento
columnas_convertir = [
    "p3271", "p6040", "p6083", "p6081", "p2057", "p6080", "p6070",
    "p6090", "p6100", "p6160", "p6170", "p3042", "50_p6240",
    "90_p7495", "90_p7505", "90_p3367", "94_p3374", "01_p5222s2", "01_p6008"
]
for col_name in columnas_convertir:
    df_depurado = df_depurado.withColumn(col_name, col(col_name).cast(DoubleType()))

# Variables binarias (donde 1 = sí, 2 = no, 9 = no informa)
binarias = ["p3271", "p6083", "p6081", "p6080", "p6100", "p2057", "p6160", "p6170", "01_p5222s2", "p6090", "90_p3367"]
for col_name in ["p3271", "p6083", "p6081", "p2057", "p6090", "01_p5222s2", "90_p3367"]:
    df_depurado = df_depurado.withColumn(
        f"{col_name}_bin",
        when(col(col_name) == 1, 1).when(col(col_name) == 2, 0).otherwise(None)
    )

# Variables categóricas nominales - aplicar one-hot encoding manual
categoricas_nominales = {
    "p6080": [1, 2, 3, 4, 5, 6],
    "p6070": [1, 2, 3, 4, 5, 6],
    "p6100": [1, 2, 3, 4],
    "50_p6240": [1, 2, 3, 4, 5, 6],
}

for col_name, categorias in categoricas_nominales.items():
    for cat in categorias:
        df_depurado = df_depurado.withColumn(
            f"{col_name}_{cat}",
            when(col(col_name) == cat, 1).otherwise(0)
        )

# Variable ordinal: nivel educativo
df_depurado = df_depurado.withColumn(
    "p3042_ord",
    when(col("p3042") == 99, None).otherwise(col("p3042"))
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
# Mostrar tipos de datos
df_depurado.printSchema()

# Ver muestra de los datos
df_depurado.show(5, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- directorio: string (nullable = true)
 |-- secuencia_p: string (nullable = true)
 |-- orden: string (nullable = true)
 |-- periodo: string (nullable = true)
 |-- mes: string (nullable = true)
 |-- per: string (nullable = true)
 |-- hogar: string (nullable = true)
 |-- regis: string (nullable = true)
 |-- area: string (nullable = true)
 |-- clase: string (nullable = true)
 |-- fex_c18: string (nullable = true)
 |-- dpto: string (nullable = true)
 |-- pt: string (nullable = true)
 |-- p3271: double (nullable = true)
 |-- p6040: double (nullable = true)
 |-- p6083: double (nullable = true)
 |-- p6081: double (nullable = true)
 |-- p2057: double (nullable = true)
 |-- p6080: double (nullable = true)
 |-- p6070: double (nullable = true)
 |-- p6090: double (nullable = true)
 |-- p6100: double (nullable = true)
 |-- p6160: double (nullable = true)
 |-- p6170: double (nullable = true)
 |-- p3042: double (nullable = true)
 |-- 50_pet: string (nullable = true)
 |-- 50_p6240: double (nul

## Creación de variable objetivo: INFORMAL

In [16]:
df_depurado = df_depurado.withColumn(
    "INFORMAL",
    when(
        (col("p6090_bin") == 0) &
        (col("01_p5222s2_bin") == 0) &
        (col("p6040") > 18) &
        (col("50_p6240") == 1),
        1
    ).otherwise(0)
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
df_depurado.groupBy("INFORMAL").count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+------+
|INFORMAL| count|
+--------+------+
|       1|  6439|
|       0|823244|
+--------+------+

## Guardar la Base de Datos depurada en refined/

In [18]:
ruta_depurado = "s3://geih-datalake-2024/refined/geih2024_depurada/"

df_depurado.write.mode("overwrite").parquet(ruta_depurado)
print("La GEIH depurada fue guardada exitosamente en:", ruta_depurado)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

La GEIH depurada fue guardada exitosamente en: s3://geih-datalake-2024/refined/geih2024_depurada/

## Selección de variables y análisis de correlación

In [19]:
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
from pyspark.sql.types import DoubleType

# Variables numéricas continuas
variables_continuas = ["p6040", "p3042_ord", "01_p6008"]


df_corr = df_depurado
for var in variables_continuas:
    df_corr = df_corr.withColumn(var, col(var).cast(DoubleType()))
    df_corr = df_corr.na.fill({var: 0.0})

# VectorAssembler para correlación
assembler_corr = VectorAssembler(inputCols=variables_continuas, outputCol="features_corr")
df_corr = assembler_corr.transform(df_corr).select("features_corr")

# Matriz de correlación
correlation_matrix = Correlation.corr(df_corr, "features_corr", method="pearson").head()[0].toArray()
print("Matriz de correlación (Pearson):")
print(correlation_matrix)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Matriz de correlaci?n (Pearson):
[[ 1.          0.07978755 -0.28760541]
 [ 0.07978755  1.         -0.15387361]
 [-0.28760541 -0.15387361  1.        ]]

## Reducción de dimensionalidad

In [20]:
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA

columnas_modelo = [
    "p3271", "p6040", "p6083", "p6081", "p2057", "p6080", "p6070", "p6090", "p6100",
    "p6160", "p6170", "p3042", "50_p6240", "90_p7495", "90_p7505", "90_p3367",
    "94_p3374", "01_p5222s2", "01_p6008", "p3271_bin", "p6083_bin", "p6081_bin",
    "p2057_bin", "p6090_bin", "01_p5222s2_bin", "90_p3367_bin",
    "p6080_1", "p6080_2", "p6080_3", "p6080_4", "p6080_5", "p6080_6",
    "p6070_1", "p6070_2", "p6070_3", "p6070_4", "p6070_5", "p6070_6",
    "p6100_1", "p6100_2", "p6100_3", "p6100_4",
    "50_p6240_1", "50_p6240_2", "50_p6240_3", "50_p6240_4", "50_p6240_5", "50_p6240_6",
    "p3042_ord"
]

# Asegurar tipo Double y nulos imputados
for col_name in columnas_modelo:
    df_depurado = df_depurado.withColumn(col_name, col(col_name).cast(DoubleType()))
    df_depurado = df_depurado.na.fill({col_name: 0.0})

# VectorAssembler
assembler = VectorAssembler(inputCols=columnas_modelo, outputCol="features")
df_vect = assembler.transform(df_depurado)

# Escalamiento
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
scaler_model = scaler.fit(df_vect)
df_scaled = scaler_model.transform(df_vect)

# PCA
pca = PCA(k=5, inputCol="scaled_features", outputCol="features_pca")
pca_model = pca.fit(df_scaled)
df_pca = pca_model.transform(df_scaled)

print("Varianza explicada por PCA:", pca_model.explainedVariance.toArray())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Varianza explicada por PCA: [0.1440231  0.09853363 0.05889138 0.05420643 0.0489519 ]

In [21]:
# Ruta de guardado en S3
ruta_pca = "s3://geih-datalake-2024/refined/pca/"

# Guardar el DataFrame
df_pca.write.mode("overwrite").parquet(ruta_pca)

print("Datos con PCA guardados exitosamente en:", ruta_pca)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Datos con PCA guardados exitosamente en: s3://geih-datalake-2024/refined/pca/

## Modelado Regresión Logística

In [22]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Dividir los datos en entrenamiento y prueba
df_train, df_test = df_scaled.randomSplit([0.8, 0.2], seed=42)

lr = LogisticRegression(featuresCol="scaled_features", labelCol="INFORMAL", maxIter=10)
modelo_lr = lr.fit(df_train)
predicciones_lr = modelo_lr.transform(df_test)

evaluator = BinaryClassificationEvaluator(labelCol="INFORMAL", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc_lr = evaluator.evaluate(predicciones_lr)
print("AUC - Regresión logística:", auc_lr)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

AUC - Regresi?n log?stica: 0.9999470283239864

## Undersampling y rebalanceo del modelo

In [23]:
df_minoria = df_scaled.filter(col("INFORMAL") == 1)
df_mayoria = df_scaled.filter(col("INFORMAL") == 0)

# Conteo clase minoritaria
count_minoria = df_minoria.count()

# Undersample clase mayoritaria al tamaño de la clase minoritaria
df_mayoria_sampled = df_mayoria.sample(fraction=count_minoria / df_mayoria.count(), seed=42)

# Unir ambos
df_balanceado = df_minoria.union(df_mayoria_sampled)

# Repartir en entrenamiento y prueba
train_data, test_data = df_balanceado.randomSplit([0.8, 0.2], seed=42)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="scaled_features", labelCol="INFORMAL", maxIter=10)
modelo_lr = lr.fit(train_data)
predicciones_lr = modelo_lr.transform(test_data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# AUC
evaluator_auc = BinaryClassificationEvaluator(labelCol="INFORMAL", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator_auc.evaluate(predicciones_lr)

# F1-score
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="INFORMAL", predictionCol="prediction", metricName="f1")
f1 = evaluator_f1.evaluate(predicciones_lr)

# Precisión
precision = MulticlassClassificationEvaluator(labelCol="INFORMAL", predictionCol="prediction", metricName="precisionByLabel").evaluate(predicciones_lr)

# Recall
recall = MulticlassClassificationEvaluator(labelCol="INFORMAL", predictionCol="prediction", metricName="recallByLabel").evaluate(predicciones_lr)

print(f"AUC: {auc}")
print(f"F1-score: {f1}")
print(f"Precisión (clase 1): {precision}")
print(f"Recall (clase 1): {recall}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

AUC: 0.9999062057566851
F1-score: 0.9976125357199083
Precisi?n (clase 1): 1.0
Recall (clase 1): 0.995334370139969

In [26]:
# Matriz de confusión
predicciones_lr.groupBy("INFORMAL", "prediction").count().orderBy("INFORMAL", "prediction").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+----------+-----+
|INFORMAL|prediction|count|
+--------+----------+-----+
|       0|       0.0| 1280|
|       0|       1.0|    6|
|       1|       1.0| 1227|
+--------+----------+-----+

## Comparación con Random Forest

In [27]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.functions import col

# Entrenamiento del modelo Random Forest
rf = RandomForestClassifier(featuresCol="scaled_features", labelCol="INFORMAL", numTrees=100, maxDepth=5)
modelo_rf = rf.fit(df_balanceado)
predicciones_rf = modelo_rf.transform(df_balanceado)

# Evaluador AUC
evaluator_auc = BinaryClassificationEvaluator(labelCol="INFORMAL", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc_rf = evaluator_auc.evaluate(predicciones_rf)
print("AUC - Random Forest:", auc_rf)

# Evaluadores adicionales
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="INFORMAL", predictionCol="prediction", metricName="f1")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="INFORMAL", predictionCol="prediction", metricName="precisionByLabel")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="INFORMAL", predictionCol="prediction", metricName="recallByLabel")

f1_rf = evaluator_f1.evaluate(predicciones_rf)
precision_rf = evaluator_precision.evaluate(predicciones_rf)
recall_rf = evaluator_recall.evaluate(predicciones_rf)

print("F1-score:", f1_rf)
print("Precisión (clase 1):", precision_rf)
print("Recall (clase 1):", recall_rf)

# Matriz de confusión
matriz_confusion_rf = predicciones_rf.groupBy("INFORMAL", "prediction").count()
matriz_confusion_rf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

AUC - Random Forest: 0.9999177418285822
F1-score: 0.9985323733620542
Precisi?n (clase 1): 1.0
Recall (clase 1): 0.9970800676194868
+--------+----------+-----+
|INFORMAL|prediction|count|
+--------+----------+-----+
|       1|       1.0| 6439|
|       0|       0.0| 6488|
|       0|       1.0|   19|
+--------+----------+-----+

## Clustering con KMeans

In [28]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

scores = []

for k in range(2, 6):  # Puedes ajustar este rango
    kmeans = KMeans(featuresCol="features_pca", predictionCol="cluster", k=k, seed=1)
    model = kmeans.fit(df_pca)
    predictions = model.transform(df_pca)

    evaluator = ClusteringEvaluator(featuresCol="features_pca", predictionCol="cluster", metricName="silhouette")
    score = evaluator.evaluate(predictions)
    scores.append((k, score))
    print(f"k={k}, Silhouette Score={score}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

k=2, Silhouette Score=0.42531747398510267
k=3, Silhouette Score=0.4240021988282736
k=4, Silhouette Score=0.3873520444511809
k=5, Silhouette Score=0.3929674851946392

In [29]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Entrenamiento del modelo KMeans
kmeans = KMeans(featuresCol="features_pca", predictionCol="cluster", k=2, seed=1)
modelo_kmeans = kmeans.fit(df_pca)

# Predicción de clusters
df_clusters = modelo_kmeans.transform(df_pca)

# Evaluar clustering con Silhouette score
evaluator = ClusteringEvaluator(featuresCol="features_pca", predictionCol="cluster", metricName="silhouette", distanceMeasure="squaredEuclidean")
silhouette = evaluator.evaluate(df_clusters)
print("Silhouette Score:", silhouette)

# Ver los centroides
centros = modelo_kmeans.clusterCenters()
print("Centroides del modelo:")
for idx, centroide in enumerate(centros):
    print(f"Cluster {idx}: {centroide}")

# 5. Exportar resultado con asignación de cluster a S3 (ajusta ruta si es necesario)
df_clusters.select("directorio", "secuencia_p", "orden", "cluster") \
    .write.mode("overwrite").option("header", "true") \
    .csv("s3://geih-datalake-2024/refined/output/kmeans_clusters")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Silhouette Score: 0.42531747398510267
Centroides del modelo:
Cluster 0: [-1.96418185  0.21357076  0.30793481  0.04932384 -0.17895988]
Cluster 1: [ 2.68557931 -0.29201025 -0.42103197 -0.06743932  0.24468761]

In [30]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

kmeans = KMeans(featuresCol="features_pca", predictionCol="cluster", k=2, seed=1)
modelo_kmeans = kmeans.fit(df_pca)
df_clusters = modelo_kmeans.transform(df_pca)

silhouette = ClusteringEvaluator(featuresCol="features_pca", predictionCol="cluster", metricName="silhouette").evaluate(df_clusters)
print("Silhouette Score:", silhouette)
df_clusters.groupBy("cluster", "INFORMAL").count().orderBy("cluster", "INFORMAL").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Silhouette Score: 0.42531747398510267
+-------+--------+------+
|cluster|INFORMAL| count|
+-------+--------+------+
|      0|       0|474274|
|      0|       1|  4932|
|      1|       0|348970|
|      1|       1|  1507|
+-------+--------+------+

## Guardar resultados en S3

In [34]:
ruta_lr = "s3://geih-datalake-2024/refined/geih2024_depurada/modelos/predicciones_lr/"
predicciones_lr.write.mode("overwrite").parquet(ruta_lr)
print("Predicciones LR guardadas en:", ruta_lr)

ruta_rf = "s3://geih-datalake-2024/refined/geih2024_depurada/modelos/predicciones_rf/"
predicciones_rf.write.mode("overwrite").parquet(ruta_rf)
print("Predicciones RF guardadas en:", ruta_rf)

ruta_cluster = "s3://geih-datalake-2024/refined/geih2024_depurada/modelos/clustering/"
df_clusters.write.mode("overwrite").parquet(ruta_cluster)
print("Resultados clustering guardados en:", ruta_cluster)

ruta_confusion = "s3://geih-datalake-2024/refined/geih2024_depurada/modelos/matriz_confusion_rf/"
matriz_confusion_rf.write.mode("overwrite").parquet(ruta_confusion)
print("Matriz de confusión RF guardada en:", ruta_confusion)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Predicciones LR guardadas en: s3://geih-datalake-2024/refined/geih2024_depurada/modelos/predicciones_lr/
Predicciones RF guardadas en: s3://geih-datalake-2024/refined/geih2024_depurada/modelos/predicciones_rf/
Resultados clustering guardados en: s3://geih-datalake-2024/refined/geih2024_depurada/modelos/clustering/
Matriz de confusi?n RF guardada en: s3://geih-datalake-2024/refined/geih2024_depurada/modelos/matriz_confusion_rf/