In [None]:
import pandas as pd
import zipfile
import os

ruta_extraccion = "/Users/dayth/OneDrive/Escritorio/Proyectos/EDA + ML/data/external/casen/"

os.makedirs(ruta_extraccion, exist_ok=True)

with zipfile.ZipFile("/Users/dayth/OneDrive/Escritorio/Proyectos/EDA + ML/data/external/casen/Base de datos Casen 2022 STATA_18 marzo 2024.dta.zip", 'r') as z:
    archivo_extraido = z.extract(z.namelist()[0], path=ruta_extraccion)
    df_principal = pd.read_stata(archivo_extraido, convert_categoricals=False)

with zipfile.ZipFile("/Users/dayth/OneDrive/Escritorio/Proyectos/EDA + ML/data/external/casen/Base de datos provincia y comuna Casen 2022 STATA.dta.zip", 'r') as z:
    archivo_extraido = z.extract(z.namelist()[0], path=ruta_extraccion)
    df_comunas = pd.read_stata(archivo_extraido, convert_categoricals=False)

df_rm = df_principal.merge(df_comunas[['folio', 'id_persona', 'comuna']], on=['folio', 'id_persona'])
df_rm = df_rm[df_rm['comuna'].astype(str).str.startswith('13')]

comunas_rm = {13101: 'Santiago', 13102: 'Cerrillos', 13103: 'Cerro Navia', 13104: 'Conchalí', 13105: 'El Bosque', 13106: 'Estación Central', 13107: 'Huechuraba', 13108: 'Independencia', 13109: 'La Cisterna', 13110: 'La Florida', 13111: 'La Granja', 13112: 'La Pintana', 13113: 'La Reina', 13114: 'Las Condes', 13115: 'Lo Barnechea', 13116: 'Lo Espejo', 13117: 'Lo Prado', 13118: 'Macul', 13119: 'Maipú', 13120: 'Ñuñoa', 13121: 'Pedro Aguirre Cerda', 13122: 'Peñalolén', 13123: 'Providencia', 13124: 'Pudahuel', 13125: 'Quilicura', 13126: 'Quinta Normal', 13127: 'Recoleta', 13128: 'Renca', 13129: 'San Joaquín', 13130: 'San Miguel', 13131: 'San Ramón', 13132: 'Vitacura', 13201: 'Puente Alto', 13202: 'Pirque', 13203: 'San José de Maipo', 13301: 'Colina', 13302: 'Lampa', 13303: 'Tiltil', 13401: 'San Bernardo', 13402: 'Buin', 13403: 'Calera de Tango', 13404: 'Paine', 13501: 'Melipilla', 13502: 'Alhué', 13503: 'Curacaví', 13504: 'María Pinto', 13505: 'San Pedro', 13601: 'Talagante', 13602: 'El Monte', 13603: 'Isla de Maipo', 13604: 'Padre Hurtado', 13605: 'Peñaflor'}

def calc_comuna_stats(grupo):
    if len(grupo) < 10: return None
    codigo = int(grupo['comuna'].iloc[0])

    def pct(serie, condicion): return (condicion(serie.dropna())).mean() * 100 if len(serie.dropna()) > 0 else 0

    stats = {
        'codigo_comuna': codigo,
        'nombre_comuna': comunas_rm.get(codigo, f'Comuna {codigo}'),
        'n_registros': len(grupo),
        'pct_casa': pct(grupo['v1'], lambda x: x == 1),
        'pct_departamento': pct(grupo['v1'], lambda x: x == 2),
        'pct_casa_depto': pct(grupo['v1'], lambda x: (x == 1) | (x == 2)),
        'pct_vivienda_precaria': pct(grupo['v1'], lambda x: x >= 4),
        'pct_paredes_solidas': pct(grupo['v2'], lambda x: x <= 2),
        'pct_techo_solido': pct(grupo['v3'], lambda x: x <= 2),
        'pct_piso_bueno': pct(grupo['v4'], lambda x: x <= 2),
        'pct_agua_red_publica': pct(grupo['v5'], lambda x: x == 1),
        'pct_wc_bueno': pct(grupo['v6'], lambda x: x <= 2),
        'pct_cocina_moderna': pct(grupo['v7'], lambda x: x <= 2),
        'pct_propia_total': pct(grupo['v9'], lambda x: x <= 2),
        'pct_arrendada': pct(grupo['v9'], lambda x: x == 3),
    }

    vars_ingreso = [c for c in grupo.columns if 'ytot' in c.lower()]
    if vars_ingreso:
        ingresos = grupo[grupo[vars_ingreso[0]].notna() & (grupo[vars_ingreso[0]] > 0)][vars_ingreso[0]]
        if len(ingresos) > 0:
            stats.update({
                'ingreso_promedio': ingresos.mean(),
                'ingreso_mediana': ingresos.median(),
            })

    stats['indice_calidad_materiales'] = (stats['pct_paredes_solidas'] + stats['pct_techo_solido'] + stats['pct_piso_bueno']) / 3
    stats['indice_servicios_basicos'] = (stats['pct_agua_red_publica'] + stats['pct_wc_bueno'] + stats['pct_cocina_moderna']) / 3
    stats['indice_calidad_vivienda_general'] = (stats['indice_calidad_materiales'] + stats['indice_servicios_basicos'] + stats['pct_casa_depto']) / 3

    return stats

resultados = []
for comuna in df_rm['comuna'].unique():
    stats = calc_comuna_stats(df_rm[df_rm['comuna'] == comuna])
    if stats: resultados.append(stats)

df_final = pd.DataFrame(resultados).round(2).sort_values('codigo_comuna')
df_final.to_csv('../../data/interim/caracteristicas_vivienda_rm_casen2022.csv', index=False, encoding='utf-8-sig')

print(f"CSV creado: {df_final.shape[0]} comunas, {df_final.shape[1]} variables")
print("Top 5 comunas por calidad:")
print(df_final.nlargest(5, 'indice_calidad_vivienda_general')[['nombre_comuna', 'indice_calidad_vivienda_general']])

One or more strings in the dta file could not be decoded using utf-8, and
so the fallback encoding of latin-1 is being used.  This can happen when a file
has been incorrectly encoded by Stata or some other software. You should verify
the string values returned are correct.
  df_principal = pd.read_stata(archivo_extraido, convert_categoricals=False)


CSV creado: 52 comunas, 20 variables
Top 5 comunas por calidad:
    nombre_comuna  indice_calidad_vivienda_general
48       La Reina                            85.90
16   Lo Barnechea                            82.82
31  Padre Hurtado                            82.28
42          Paine                            80.99
14       Vitacura                            80.64
