In [129]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, OneHotEncoder,VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import findspark

findspark.init('/usr/lib/python3.7/site-packages/pyspark')

spark = SparkSession \
    .builder \
    .appName("Basic JDBC pipeline") \
    .config("spark.driver.extraClassPath", "postgresql-42.2.14.jar") \
    .config("spark.executor.extraClassPath", "postgresql-42.2.14.jar") \
    .getOrCreate()

In [87]:
## Se carga la base de datos de construccion
construccion_df = spark.read.csv("Base_Anonimizada2022.csv",header=True,inferSchema=True)

##Se carga la base de datos que mapea los cantones con su respectivo codigo y se limpia

cantones_df =spark.read.csv("SEN_GEOGRAFICO_1.csv",header=True,inferSchema=True)

##Se carga la base de datos que mapea cada canton con la region

regiones_df = spark.read.csv("division_territorial_por_region.csv",header=True,inferSchema=True)

##Se carga la base de datos de ENAHO

enaho_2022_df = spark.read.csv("BdBasePublica.csv",header=True,inferSchema=True)

                                                                                

In [88]:
def construcciones_region_df_func(constru_df,canton_df,region_df):

    construccion_residencial_df = constru_df.filter(constru_df.claobr==1)
    construccion_residencial_df = construccion_residencial_df.select("pro_num_prov","pc_num_cant","num_obras","arecon","numviv","numapo","numdor","valobr")
    construccion_residencial_agrupada_df = construccion_residencial_df.groupby("pro_num_prov","pc_num_cant").agg(F.avg("num_obras").alias("pro_num_obras"),F.avg("arecon").alias("prom_arecon"),
                                                                                                            F.avg("numviv").alias("prom_numviv"),F.avg("numapo").alias("prom_numapo"),
                                                                                                            F.avg("numdor").alias("prom_numdor"),F.avg("valobr").alias("prom_valobr"))
    
    cantones_codigo_df = canton_df.withColumn("Codigo_DTA",F.split(canton_df["CodigoDTA"],",")[0])\
                                .withColumn("Canton",F.split(canton_df["CodigoDTA"],",")[1]).drop("CodigoDTA","Nombre")
    
    for column in cantones_codigo_df.columns:
        cantones_codigo_df = cantones_codigo_df.withColumn(column,F.regexp_replace(column,'"',''))

    regiones_limpio_df = region_df.withColumn("Codigo_DTA",F.substring("CODIGO",1,3)).select("Codigo_DTA","CANTON","REGION").distinct()

    construccion_cantones_df=construccion_residencial_agrupada_df.join(cantones_codigo_df,construccion_residencial_agrupada_df["pc_num_cant"]==cantones_codigo_df["Codigo_DTA"],
                                                                   how="inner").drop("pc_num_cant","pro_num_prov")
    
    construccion_regiones_df=construccion_cantones_df.join(regiones_limpio_df,on="Codigo_DTA",how="left").drop("Canton","CANTON","Codigo_DTA")

    columnas_promedio = [col for col in construccion_regiones_df.columns if col!="REGION"]

    construccion_regiones_agrupada_df = construccion_regiones_df.groupby("REGION").agg(*(F.avg(col).alias("reg_"+col) for col in columnas_promedio))

    construccion_regiones_agrupada_df=construccion_regiones_agrupada_df.withColumn("Codigo_Region",
    F.when(construccion_regiones_agrupada_df["REGION"]=="CENTRAL", 1)
    .when(construccion_regiones_agrupada_df["REGION"]=="CHOROTEGA", 2)
    .when(construccion_regiones_agrupada_df["REGION"]=="PACIFICO CENTRAL", 3)
    .when(construccion_regiones_agrupada_df["REGION"]=="BRUNCA", 4)
    .when(construccion_regiones_agrupada_df["REGION"]=="HUETAR CARIBE", 5)
    .when(construccion_regiones_agrupada_df["REGION"]=="HUETAR NORTE", 6))

    return construccion_regiones_agrupada_df

In [89]:
construccion_prom_region_2022 = construcciones_region_df_func(construccion_df,cantones_df,regiones_df)

construccion_prom_region_2022.show()

+----------------+------------------+------------------+------------------+-----------------+------------------+--------------------+-------------+
|          REGION| reg_pro_num_obras|   reg_prom_arecon|   reg_prom_numviv|  reg_prom_numapo|   reg_prom_numdor|     reg_prom_valobr|Codigo_Region|
+----------------+------------------+------------------+------------------+-----------------+------------------+--------------------+-------------+
|    HUETAR NORTE|1.0503136153898356| 71.42357793198545|0.9486050771143969|4.714806704849421| 2.172745293336891|1.9334500496198073E7|            6|
|   HUETAR CARIBE|1.1466133138681152|59.878152501614835|0.9699997159750016| 3.98014685977476|1.8833681343661388|1.5375398208581805E7|            5|
|         CENTRAL|1.3386748779531557|133.60794488188338|1.0805488324926336|6.747149989782027|2.8766782482438202| 4.539048286811655E7|            1|
|          BRUNCA|1.0385502365644046|  69.4399004299654|0.9254513138431006| 4.61359800403076| 2.096031027513646|

In [90]:
def enaho_func(enaho_df):

    enaho_2022_variables_df = enaho_df.select("ID_HOGAR","LINEA","REGION","ZONA","ithb","Escolari","C2A4","TamViv","V18J1","V18F1","V2A")

    "La variable 'Tenencia de Viviennda' contiene 5 categorias. Para efectos de este trabajo se agrupan en solo 2. Casa Propia (1) Casa que no es propia (0)"
    enaho_2022_variables_binario_df = enaho_df.withColumn("Tenencia_Vivienda", F.when(enaho_2022_variables_df.V2A.isin([1,2]),1).otherwise(0)).drop("V2A","ID_HOGAR")

    '''Los datos de ENAHO vienen a nivel de hogar y nivel individual. Para el siguiente trabajo nos interesa utilizar las variables a nivel de hogar y agrupar aquellas que vienen a nivel individual.
    Para esto se crea un identificador a nivel de cada hogar y se grupan las variables a nivel individual'''

    windowSpec = Window.orderBy(F.monotonically_increasing_id()).rowsBetween(Window.unboundedPreceding, 0)

    enaho_2022_variables_binario_df= enaho_2022_variables_binario_df.withColumn("id", F.sum(F.when(F.col("LINEA") == 1, 1).otherwise(0)).over(windowSpec)).drop("LINEA")

    enaho_2022_hogar_agr_df = enaho_2022_variables_binario_df.groupby("id","REGION","Tenencia_Vivienda","TamViv","V18J1","V18F1","ZONA").agg(F.sum("Escolari").alias("suma_escolari_hogar"),
                                                                                                                                F.avg("C2A4").alias("suma_horas_trab_hogar"),
                                                                                                                                F.avg("ithb").alias("Ingreso_Total_Bruto_Hogar")) \
                                                                                                                                .orderBy(F.col("id").asc())
    enaho_2022_hogar_renombrado_df = enaho_2022_hogar_agr_df.withColumnRenamed("TamViv","Cantidad_Personas") \
                                                        .withColumnRenamed("V18J1","Cantidad_vehiculos").withColumnRenamed("V18F1","Cantidad_Computadoras")\
                                                        .withColumnRenamed("REGION","Region_Geo")

    
    return enaho_2022_hogar_renombrado_df

In [91]:
enaho_2022 = enaho_func(enaho_2022_df)

enaho_2022.show()

24/05/10 23:37:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/10 23:37:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+---+----------+-----------------+-----------------+------------------+---------------------+----+-------------------+---------------------+-------------------------+
| id|Region_Geo|Tenencia_Vivienda|Cantidad_Personas|Cantidad_vehiculos|Cantidad_Computadoras|ZONA|suma_escolari_hogar|suma_horas_trab_hogar|Ingreso_Total_Bruto_Hogar|
+---+----------+-----------------+-----------------+------------------+---------------------+----+-------------------+---------------------+-------------------------+
|  1|       1.0|                1|              2.0|               1.0|                 null| 1.0|               12.0|                 null|                1029167.0|
|  2|       1.0|                1|              4.0|              null|                 null| 1.0|               46.0|                 47.5|                2448333.0|
|  3|       1.0|                0|              3.0|              null|                 null| 1.0|               30.0|                 48.0|                 494000.0

24/05/10 23:37:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/10 23:37:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [103]:
def unir_datos(enaho_df,construccion_df):

    tenencia_vivienda = enaho_df.join(construccion_df,enaho_df["Region_Geo"]==construccion_df["Codigo_Region"],how='left')
    tenencia_vivienda = tenencia_vivienda.drop('id','Region_Geo','Codigo_Region','REGION')                                                      
    return tenencia_vivienda

In [104]:
tenencia_vivienda_df = unir_datos(enaho_2022,construccion_prom_region_2022)

tenencia_vivienda_df.show()

24/05/10 23:42:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/10 23:42:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/10 23:42:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/10 23:42:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/10 23:42:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/10 23:42:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/10 2

+-----------------+-----------------+------------------+---------------------+----+-------------------+---------------------+-------------------------+------------------+------------------+------------------+-----------------+------------------+-------------------+
|Tenencia_Vivienda|Cantidad_Personas|Cantidad_vehiculos|Cantidad_Computadoras|ZONA|suma_escolari_hogar|suma_horas_trab_hogar|Ingreso_Total_Bruto_Hogar| reg_pro_num_obras|   reg_prom_arecon|   reg_prom_numviv|  reg_prom_numapo|   reg_prom_numdor|    reg_prom_valobr|
+-----------------+-----------------+------------------+---------------------+----+-------------------+---------------------+-------------------------+------------------+------------------+------------------+-----------------+------------------+-------------------+
|                1|              4.0|               2.0|                  2.0| 1.0|               50.0|                 44.0|                3550000.0|1.3386748779531557|133.60794488188338|1.08054883249

In [105]:
def guardar_a_postgres(construccion,enaho,base_unida):
    construccion \
    .write \
    .format("jdbc") \
    .mode('overwrite') \
    .option("url", "jdbc:postgresql://host.docker.internal:5433/postgres") \
    .option("user", "postgres") \
    .option("password", "testPassword") \
    .option("dbtable", "construccion_df") \
    .save()
    
    enaho \
    .write \
    .format("jdbc") \
    .mode('overwrite') \
    .option("url", "jdbc:postgresql://host.docker.internal:5433/postgres") \
    .option("user", "postgres") \
    .option("password", "testPassword") \
    .option("dbtable", "enaho_df") \
    .save()
    
    base_unida \
    .write \
    .format("jdbc") \
    .mode('overwrite') \
    .option("url", "jdbc:postgresql://host.docker.internal:5433/postgres") \
    .option("user", "postgres") \
    .option("password", "testPassword") \
    .option("dbtable", "tenencia_vivienda_df") \
    .save()

In [106]:
guardar_a_postgres(construccion_prom_region_2022,enaho_2022,tenencia_vivienda_df)

24/05/10 23:42:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/10 23:42:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/10 23:42:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/10 23:42:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/10 23:42:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/10 23:42:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/10 2

In [107]:
construccion_df = spark \
    .read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host.docker.internal:5433/postgres") \
    .option("user", "postgres") \
    .option("password", "testPassword") \
    .option("dbtable", "construccion_df") \
    .load()

construccion_df.show()

+----------------+------------------+------------------+------------------+-----------------+------------------+--------------------+-------------+
|          REGION| reg_pro_num_obras|   reg_prom_arecon|   reg_prom_numviv|  reg_prom_numapo|   reg_prom_numdor|     reg_prom_valobr|Codigo_Region|
+----------------+------------------+------------------+------------------+-----------------+------------------+--------------------+-------------+
|    HUETAR NORTE|1.0503136153898356| 71.42357793198545|0.9486050771143969|4.714806704849421| 2.172745293336891|1.9334500496198073E7|            6|
|   HUETAR CARIBE|1.1466133138681152|59.878152501614835|0.9699997159750016| 3.98014685977476|1.8833681343661388|1.5375398208581805E7|            5|
|         CENTRAL|1.3386748779531557|133.60794488188338|1.0805488324926336|6.747149989782027|2.8766782482438202| 4.539048286811655E7|            1|
|          BRUNCA|1.0385502365644046|  69.4399004299654|0.9254513138431006| 4.61359800403076| 2.096031027513646|

In [108]:
enaho_df = spark \
    .read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host.docker.internal:5433/postgres") \
    .option("user", "postgres") \
    .option("password", "testPassword") \
    .option("dbtable", "enaho_df") \
    .load()

enaho_df.show()

+---+----------+-----------------+-----------------+------------------+---------------------+----+-------------------+---------------------+-------------------------+
| id|Region_Geo|Tenencia_Vivienda|Cantidad_Personas|Cantidad_vehiculos|Cantidad_Computadoras|ZONA|suma_escolari_hogar|suma_horas_trab_hogar|Ingreso_Total_Bruto_Hogar|
+---+----------+-----------------+-----------------+------------------+---------------------+----+-------------------+---------------------+-------------------------+
|  1|       1.0|                1|              2.0|               1.0|                 null| 1.0|               12.0|                 null|                1029167.0|
|  2|       1.0|                1|              4.0|              null|                 null| 1.0|               46.0|                 47.5|                2448333.0|
|  3|       1.0|                0|              3.0|              null|                 null| 1.0|               30.0|                 48.0|                 494000.0

In [109]:
tenencia_vivienda_df = spark \
    .read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host.docker.internal:5433/postgres") \
    .option("user", "postgres") \
    .option("password", "testPassword") \
    .option("dbtable", "tenencia_vivienda_df") \
    .load()

tenencia_vivienda_df.show()

+-----------------+-----------------+------------------+---------------------+----+-------------------+---------------------+-------------------------+------------------+------------------+------------------+-----------------+------------------+-------------------+
|Tenencia_Vivienda|Cantidad_Personas|Cantidad_vehiculos|Cantidad_Computadoras|ZONA|suma_escolari_hogar|suma_horas_trab_hogar|Ingreso_Total_Bruto_Hogar| reg_pro_num_obras|   reg_prom_arecon|   reg_prom_numviv|  reg_prom_numapo|   reg_prom_numdor|    reg_prom_valobr|
+-----------------+-----------------+------------------+---------------------+----+-------------------+---------------------+-------------------------+------------------+------------------+------------------+-----------------+------------------+-------------------+
|                1|              4.0|               2.0|                  2.0| 1.0|               50.0|                 44.0|                3550000.0|1.3386748779531557|133.60794488188338|1.08054883249

In [170]:
#Se remplazan las observaciones nulas con 0
tenencia_vivienda_df_1 =tenencia_vivienda_df.fillna(0)

tenencia_vivienda_df_1.show()

print(tenencia_vivienda_df_1.columns)

+-----------------+-----------------+------------------+---------------------+----+-------------------+---------------------+-------------------------+------------------+------------------+------------------+-----------------+------------------+-------------------+
|Tenencia_Vivienda|Cantidad_Personas|Cantidad_vehiculos|Cantidad_Computadoras|ZONA|suma_escolari_hogar|suma_horas_trab_hogar|Ingreso_Total_Bruto_Hogar| reg_pro_num_obras|   reg_prom_arecon|   reg_prom_numviv|  reg_prom_numapo|   reg_prom_numdor|    reg_prom_valobr|
+-----------------+-----------------+------------------+---------------------+----+-------------------+---------------------+-------------------------+------------------+------------------+------------------+-----------------+------------------+-------------------+
|                1|              4.0|               2.0|                  2.0| 1.0|               50.0|                 44.0|                3550000.0|1.3386748779531557|133.60794488188338|1.08054883249

In [176]:
feature_columns = [col for col in tenencia_vivienda_df_1.columns if col !='Tenencia_Vivienda']
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
vector_df = assembler.transform(tenencia_vivienda_df_1)
vector_df = vector_df.select(['features','Tenencia_Vivienda'])
vector_df.show()

+--------------------+-----------------+
|            features|Tenencia_Vivienda|
+--------------------+-----------------+
|[4.0,2.0,2.0,1.0,...|                1|
|[3.0,1.0,1.0,1.0,...|                1|
|[1.0,1.0,1.0,1.0,...|                1|
|[4.0,1.0,1.0,1.0,...|                0|
|[4.0,1.0,2.0,1.0,...|                1|
|[5.0,1.0,1.0,1.0,...|                0|
|[3.0,1.0,2.0,1.0,...|                1|
|[4.0,1.0,1.0,1.0,...|                1|
|[3.0,2.0,2.0,1.0,...|                1|
|[5.0,1.0,1.0,1.0,...|                0|
|[4.0,1.0,1.0,1.0,...|                0|
|[4.0,1.0,1.0,1.0,...|                1|
|[2.0,1.0,1.0,1.0,...|                0|
|[3.0,1.0,1.0,1.0,...|                1|
|[4.0,1.0,2.0,1.0,...|                1|
|[2.0,1.0,2.0,1.0,...|                1|
|[3.0,1.0,1.0,1.0,...|                0|
|[3.0,1.0,2.0,1.0,...|                1|
|[3.0,1.0,1.0,1.0,...|                1|
|[4.0,2.0,2.0,1.0,...|                0|
+--------------------+-----------------+
only showing top

In [177]:
standard_scaler = StandardScaler(inputCol='features', outputCol='columns_scaled')
scale_model = standard_scaler.fit(vector_df)

scaled_df = scale_model.transform(vector_df)

#Se muestran los resultados
vector_scaled_df = scaled_df.select(['columns_scaled','Tenencia_Vivienda'])
vector_scaled_df.show()
vector_scaled_df.printSchema()

+--------------------+-----------------+
|      columns_scaled|Tenencia_Vivienda|
+--------------------+-----------------+
|[2.54800381482655...|                1|
|[1.91100286111991...|                1|
|[0.63700095370663...|                1|
|[2.54800381482655...|                0|
|[2.54800381482655...|                1|
|[3.18500476853319...|                0|
|[1.91100286111991...|                1|
|[2.54800381482655...|                1|
|[1.91100286111991...|                1|
|[3.18500476853319...|                0|
|[2.54800381482655...|                0|
|[2.54800381482655...|                1|
|[1.27400190741327...|                0|
|[1.91100286111991...|                1|
|[2.54800381482655...|                1|
|[1.27400190741327...|                1|
|[1.91100286111991...|                0|
|[1.91100286111991...|                1|
|[1.91100286111991...|                1|
|[2.54800381482655...|                0|
+--------------------+-----------------+
only showing top

In [200]:
#Primer modelo (Regresion Logistica)

# Division en entrenamiento y prueba
train_data, test_data = vector_scaled_df.randomSplit([0.7, 0.3], seed=123)

lr = LogisticRegression(featuresCol="columns_scaled", labelCol='Tenencia_Vivienda')

lr_model = lr.fit(train_data)

#Prediccion en datos de prueba
prediccion_lr = lr_model.transform(test_data)


                                                                                

In [201]:
evaluator = BinaryClassificationEvaluator(labelCol='Tenencia_Vivienda')
area_under_curve = evaluator.evaluate(prediccion_lr)
print("Area Under ROC Curve:", area_under_curve)


Area Under ROC Curve: 0.6736479508797696


In [203]:
# Segundo Modelo (arbol de decision)

# Division en entrenamiento y prueba
train_data, test_data = vector_scaled_df.randomSplit([0.7, 0.3], seed=123)


dt = DecisionTreeClassifier(featuresCol="columns_scaled", labelCol="Tenencia_Vivienda")

#Parametros para validacion cruzada
paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5, 10]) \
    .addGrid(dt.maxBins, [20, 30]) \
    .build()



#Validacion cruzada
cv = CrossValidator(estimator=dt,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=5)

#Entrenamiento del modelo
cvModel = cv.fit(train_data)

#Prediccion en datos de prueba
prediccion_dt = cvModel.transform(test_data)



24/05/11 02:46:19 WARN CacheManager: Asked to cache already cached data.
24/05/11 02:46:19 WARN CacheManager: Asked to cache already cached data.


In [204]:
evaluator = BinaryClassificationEvaluator(labelCol='Tenencia_Vivienda')
roc_dt = evaluator.evaluate(prediccion_dt)
print(f"Area bajo la curva ROC: {roc_dt}")

Area bajo la curva ROC: 0.5696153330164866
