In [272]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import *

In [273]:
spark = SparkSession.builder.getOrCreate()
local_route = "C:/Users/sergio.fuentes/Developer/IntelliJ/Padron/src/hostPath/"
file_name = "Rango_Edades_Seccion_202205.csv"
padron_csv = local_route + file_name
n_MAX = 5

In [274]:
padron = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferschema", "true")\
    .option("emptyValue", 0)\
    .option("delimiter", ";")\
.load(padron_csv)

In [275]:
padron2 = padron.withColumn("DESC_DISTRITO", trim(col("desc_distrito")))\
    .withColumn("DESC_BARRIO", trim(col("desc_barrio")))

In [276]:
padron_mal = spark.read.format("csv")\
    .option("header","true")\
    .option("inferschema","true")\
    .option("delimiter",";")\
.load(padron_csv)

In [277]:
padron_cambiado = padron_mal.na.fill(value=0)\
    .withColumn("DESC_DISTRITO", trim(col("desc_distrito")))\
    .withColumn("DESC_BARRIO", trim(col("desc_barrio")))

In [278]:
padron2.select(countDistinct("desc_barrio").alias("n_barrios")).show(n_MAX)

+---------+
|n_barrios|
+---------+
|      131|
+---------+



In [279]:
padron_cambiado.createOrReplaceTempView("padron")

spark.sql("select count(distinct(desc_barrio)) as n_barrios from padron").show(n_MAX)

+---------+
|n_barrios|
+---------+
|      131|
+---------+



In [280]:
padron3 = padron_cambiado.withColumn("longitud", length(col("desc_distrito")))

In [281]:
padron4 = padron3.withColumn("valor5", lit(5))  
padron4.show(n_MAX)

+------------+-------------+---------------+-----------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+--------+------+
|COD_DISTRITO|DESC_DISTRITO|COD_DIST_BARRIO|DESC_BARRIO|COD_BARRIO|COD_DIST_SECCION|COD_SECCION|COD_EDAD_INT|EspanolesHombres|EspanolesMujeres|ExtranjerosHombres|ExtranjerosMujeres|longitud|valor5|
+------------+-------------+---------------+-----------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+--------+------+
|           1|       CENTRO|            101|    PALACIO|         1|            1001|          1|           0|               3|               2|                 0|                 0|       6|     5|
|           1|       CENTRO|            101|    PALACIO|         1|            1001|          1|           1|               4|               2|                 1|                 0|       6|     5|
|         

In [282]:
padron5 = padron4.drop(col("valor5"))
padron5.show(n_MAX)

+------------+-------------+---------------+-----------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+--------+
|COD_DISTRITO|DESC_DISTRITO|COD_DIST_BARRIO|DESC_BARRIO|COD_BARRIO|COD_DIST_SECCION|COD_SECCION|COD_EDAD_INT|EspanolesHombres|EspanolesMujeres|ExtranjerosHombres|ExtranjerosMujeres|longitud|
+------------+-------------+---------------+-----------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+--------+
|           1|       CENTRO|            101|    PALACIO|         1|            1001|          1|           0|               3|               2|                 0|                 0|       6|
|           1|       CENTRO|            101|    PALACIO|         1|            1001|          1|           1|               4|               2|                 1|                 0|       6|
|           1|       CENTRO|            101| 

In [283]:
padron_particionado = padron5.repartition(col("DESC_DISTRITO"), col("DESC_BARRIO"))

In [284]:
padron_particionado.cache()

DataFrame[COD_DISTRITO: int, DESC_DISTRITO: string, COD_DIST_BARRIO: int, DESC_BARRIO: string, COD_BARRIO: int, COD_DIST_SECCION: int, COD_SECCION: int, COD_EDAD_INT: int, EspanolesHombres: int, EspanolesMujeres: int, ExtranjerosHombres: int, ExtranjerosMujeres: int, longitud: int]

In [285]:
padron_particionado.groupBy(col("desc_barrio"), col("desc_distrito"))\
.agg(count(col("espanolesHombres")).alias("espanolesHombres"),\
    count(col("espanolesMujeres")).alias("espanolesMujeres"),\
    count(col("extranjerosHombres")).alias("extranjerosHombres"),\
    count(col("extranjerosMujeres")).alias("extranjerosMujeres"))\
.orderBy(desc("extranjerosMujeres"), desc("extranjerosHombres"))\
.show(n_MAX)

+-------------+-------------------+----------------+----------------+------------------+------------------+
|  desc_barrio|      desc_distrito|espanolesHombres|espanolesMujeres|extranjerosHombres|extranjerosMujeres|
+-------------+-------------------+----------------+----------------+------------------+------------------+
|       ALUCHE|             LATINA|            5749|            5749|              5749|              5749|
| PUEBLO NUEVO|      CIUDAD LINEAL|            4498|            4498|              4498|              4498|
|     EL PILAR|FUENCARRAL-EL PARDO|            4366|            4366|              4366|              4366|
|  LAS AGUILAS|             LATINA|            4100|            4100|              4100|              4100|
|PINAR DEL REY|          HORTALEZA|            4078|            4078|              4078|              4078|
+-------------+-------------------+----------------+----------------+------------------+------------------+
only showing top 5 rows



In [286]:
spark.catalog.clearCache()

In [287]:
df1 = padron_particionado.select(col("DESC_BARRIO"), col("DESC_DISTRITO"), col("ESPANOLESHOMBRES"))\
    .groupBy(col("DESC_BARRIO"), col("DESC_DISTRITO"))\
    .agg(sum(col("ESPANOLESHOMBRES")).alias("ESPANOLESHOMBRES"))

In [288]:
res = df1.join(padron,  (padron.DESC_BARRIO == df1.DESC_BARRIO) &\
       (padron.DESC_DISTRITO == df1.DESC_DISTRITO), "inner")
res.show()

+-----------+-------------+----------------+------------+-------------+---------------+-----------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+
|DESC_BARRIO|DESC_DISTRITO|ESPANOLESHOMBRES|COD_DISTRITO|DESC_DISTRITO|COD_DIST_BARRIO|DESC_BARRIO|COD_BARRIO|COD_DIST_SECCION|COD_SECCION|COD_EDAD_INT|EspanolesHombres|EspanolesMujeres|ExtranjerosHombres|ExtranjerosMujeres|
+-----------+-------------+----------------+------------+-------------+---------------+-----------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+
+-----------+-------------+----------------+------------+-------------+---------------+-----------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+



In [289]:
padron_ventana = padron.withColumn("TotalEspHom", sum(col("espanoleshombres"))\
    .over(Window.partitionBy("DESC_DISTRITO", "DESC_BARRIO")))
padron_ventana.show(n_MAX)

+------------+--------------------+---------------+--------------------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+-----------+
|COD_DISTRITO|       DESC_DISTRITO|COD_DIST_BARRIO|         DESC_BARRIO|COD_BARRIO|COD_DIST_SECCION|COD_SECCION|COD_EDAD_INT|EspanolesHombres|EspanolesMujeres|ExtranjerosHombres|ExtranjerosMujeres|TotalEspHom|
+------------+--------------------+---------------+--------------------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+-----------+
|           6|TETUAN              |            605|VALDEACEDERAS       |         5|            6095|         95|          34|               7|               6|                 5|                 4|       9479|
|           6|TETUAN              |            605|VALDEACEDERAS       |         5|            6095|         95|          35|               8|               8| 

In [290]:
distritos = ("BARAJAS", "CENTRO","RETIRO")

padron_pivot = padron_particionado\
    .groupBy("cod_edad_int")\
    .pivot("desc_distrito", distritos)\
    .sum("espanolesMujeres")\
    .orderBy(col("cod_edad_int"))

padron_pivot.show(n_MAX)

+------------+-------+------+------+
|cod_edad_int|BARAJAS|CENTRO|RETIRO|
+------------+-------+------+------+
|           0|    144|   256|   303|
|           1|    151|   237|   315|
|           2|    182|   191|   379|
|           3|    170|   218|   373|
|           4|    196|   229|   414|
+------------+-------+------+------+
only showing top 5 rows



In [291]:
padron_porcen = padron_pivot.withColumn("porcentaje_barajas", round(col("barajas") / (col("barajas") + col("centro") + col("retiro")) * 100, 2))\
    .withColumn("porcentaje_centro", round(col("centro") / (col("barajas") + col("CENTRO") + col("RETIRO")) * 100, 2))\
    .withColumn("porcentaje_retiro", round(col("retiro") / (col("BARAJAS") + col("CENTRO") + col("RETIRO")) * 100, 2))

In [292]:
padron.write.format("csv")\
    .option("header", "True")\
    .mode("overwrite")\
    .partitionBy("desc_distrito", "desc_barrio")\
    .save(local_route + "saveCSV")

In [None]:
padron.write.format("parquet")\
    .mode("overwrite")\
    .partitionBy("desc_distrito", "desc_barrio")\
    .save(local_route + "saveCSV")