# Ejercicio de las prácticas del Padrón.

Antes de hacer nada, tenemos que __crear la sesión__.  

Para ello utilizaremos __findspark.init()__, que nos permitirá utilizar las funciones de pyspark.  
Después configuraremos nuestra sesión con __SparkSession__.

In [1]:
# Crear sesión
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = (SparkSession
        .builder
        .appName("PadronPySpark")
        .getOrCreate())

Pyspark nos permite importar __clases__ con diferentes funcionalidades.  
En el siguiente comando importo algunas de las __funciones__ que uso a lo largo de los ejercicos.

In [2]:
# Imports
from pyspark.sql.functions import trim, col, length, lit, round
from pyspark.sql import Window

Importamos el archivo con diferentes opciones:
* Con __header__ indicamos si los datos vienen con una linea al principio con la estructura.
* Con __inferSchema__ indicamos si queremos que se use el header como schema.
* Con __delimiter__ indicamos el caracter que separa los valores de las distintas columnas.
* Con __quotes__ indicamos si el caracter que delimita los valores de las columnas.
* __emptyValue__ sirve para convertir los nulls en el valor que queramos.

In [3]:
# 6.1 Importar csv con opcs que quiten espacios y cambien null por 0
ruta = "C:/Users/carlos.ibanez/Documents/CursoBigData/Rango_Edades_Seccion_202012.csv"
padron_df_raw = (spark.read.format("csv")
             .option("header", "true")
            .option("inferSchema", "true")
            .option("delimiter", ";")
            .option("quotes", "\"")
            .option("emptyValue", 0)
            .load(ruta))
padron_df_raw.show(5)

+------------+--------------------+---------------+--------------------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+
|COD_DISTRITO|       DESC_DISTRITO|COD_DIST_BARRIO|         DESC_BARRIO|COD_BARRIO|COD_DIST_SECCION|COD_SECCION|COD_EDAD_INT|EspanolesHombres|EspanolesMujeres|ExtranjerosHombres|ExtranjerosMujeres|
+------------+--------------------+---------------+--------------------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+
|           1|CENTRO              |            101|PALACIO             |         1|            1006|          6|         103|               0|               1|                 0|                 0|
|           1|CENTRO              |            101|PALACIO             |         1|            1007|          7|           0|               1|               1|                 0|                 3|
|         

Esta función nos sirve para borrar todos los espacios en blanco.

In [4]:
# Limpiar espacios
padron_df = padron_df_raw.select([(trim(i[0])).alias(i[0]) if i[1] == "string" else i[0] for i in padron_df_raw.select("*").dtypes])

padron_df.show(5)

+------------+-------------+---------------+-----------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+
|COD_DISTRITO|DESC_DISTRITO|COD_DIST_BARRIO|DESC_BARRIO|COD_BARRIO|COD_DIST_SECCION|COD_SECCION|COD_EDAD_INT|EspanolesHombres|EspanolesMujeres|ExtranjerosHombres|ExtranjerosMujeres|
+------------+-------------+---------------+-----------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+
|           1|       CENTRO|            101|    PALACIO|         1|            1006|          6|         103|               0|               1|                 0|                 0|
|           1|       CENTRO|            101|    PALACIO|         1|            1007|          7|           0|               1|               1|                 0|                 3|
|           1|       CENTRO|            101|    PALACIO|         1|            1007|      

Spark nos permite utilizar las funciones típicas de __sql__ como comandos en un _DataFrame_.

In [5]:
# 6.3 Enumerar todos los barrios diferentes
barrios = (padron_df
           .select("DESC_BARRIO")
           .distinct())
barrios.show()

+--------------------+
|         DESC_BARRIO|
+--------------------+
|        VALDEFUENTES|
|       LOS JERONIMOS|
|            ABRANTES|
|            VALVERDE|
|              CORTES|
|   PALOMERAS SURESTE|
|CIUDAD UNIVERSITARIA|
|      CUATRO VIENTOS|
|           TRAFALGAR|
|    ALAMEDA DE OSUNA|
|              HELLIN|
|          PRADOLONGO|
|            MOSCARDO|
|          VALDEZARZA|
|           RECOLETOS|
|             HORCAJO|
|        EL CAÑAVERAL|
|             EL VISO|
|        VISTA ALEGRE|
|    PUERTA DEL ANGEL|
+--------------------+
only showing top 20 rows



También nos permite crear __views__ y usar directamente toda la sintaxis sql.

In [6]:
# 6.4 Contar número de barrios a través de view padrón
padron_df.createOrReplaceTempView("padron")
spark.sql("select count(distinct desc_barrio) from padron").show()


+---------------------------+
|count(DISTINCT desc_barrio)|
+---------------------------+
|                        132|
+---------------------------+



A parte de seleccionar columnas de los datos originales, podemos __crear nuevas columnas__ a partir de ellas.

In [7]:
# 6.5 Crear nueva columna que muestre la longitud de desc_distrito
padron_length = (padron_df
                 .withColumn("Longitud", length(col("DESC_DISTRITO")))
                 .select("Desc_Distrito", "Longitud")
                 .distinct()
                 .show())

+-------------------+--------+
|      Desc_Distrito|Longitud|
+-------------------+--------+
|         ARGANZUELA|      10|
|FUENCARRAL-EL PARDO|      19|
|              USERA|       5|
|          SALAMANCA|       9|
| PUENTE DE VALLECAS|      18|
|  VILLA DE VALLECAS|      17|
|           CHAMBERI|       8|
|          VICALVARO|       9|
|             RETIRO|       6|
|             CENTRO|       6|
|SAN BLAS-CANILLEJAS|      19|
|          CHAMARTIN|       9|
|             LATINA|       6|
|          MORATALAZ|       9|
|            BARAJAS|       7|
|             TETUAN|       6|
|      CIUDAD LINEAL|      13|
|          HORTALEZA|       9|
|         VILLAVERDE|      10|
|        CARABANCHEL|      11|
+-------------------+--------+
only showing top 20 rows



O usando valores literales o no relacionados con los datos originales.

In [8]:
# Crear una columna que muestre el valor 5
padron_5 = (padron_df
            .withColumn("Cinco", lit("5"))
            .select("Desc_Distrito", "Cinco")
            .distinct())
padron_5.show()

+-------------------+-----+
|      Desc_Distrito|Cinco|
+-------------------+-----+
|          VICALVARO|    5|
|             TETUAN|    5|
|           CHAMBERI|    5|
|             RETIRO|    5|
|  VILLA DE VALLECAS|    5|
|         VILLAVERDE|    5|
|        CARABANCHEL|    5|
|              USERA|    5|
|            BARAJAS|    5|
|          CHAMARTIN|    5|
|    MONCLOA-ARAVACA|    5|
|      CIUDAD LINEAL|    5|
|          MORATALAZ|    5|
|         ARGANZUELA|    5|
|          SALAMANCA|    5|
|          HORTALEZA|    5|
|SAN BLAS-CANILLEJAS|    5|
|             CENTRO|    5|
|FUENCARRAL-EL PARDO|    5|
| PUENTE DE VALLECAS|    5|
+-------------------+-----+
only showing top 20 rows



Del mismo modo que podemos añadir columnas, también podemos __quitarlas__.

In [9]:
# 6.7 Borrar la columna
padron_drop = padron_5.drop("Cinco")
padron_drop.show()

+-------------------+
|      Desc_Distrito|
+-------------------+
|          VICALVARO|
|             TETUAN|
|           CHAMBERI|
|             RETIRO|
|  VILLA DE VALLECAS|
|         VILLAVERDE|
|        CARABANCHEL|
|              USERA|
|            BARAJAS|
|          CHAMARTIN|
|    MONCLOA-ARAVACA|
|      CIUDAD LINEAL|
|          MORATALAZ|
|         ARGANZUELA|
|          SALAMANCA|
|          HORTALEZA|
|SAN BLAS-CANILLEJAS|
|             CENTRO|
|FUENCARRAL-EL PARDO|
| PUENTE DE VALLECAS|
+-------------------+
only showing top 20 rows



Spark nos da la opción de __particionar__ las tablas por columnas.

In [10]:
# 6.8 Particionar DataFrame por Desc_Distrito y Desc_Barrio
padron_part = (padron_df
              .repartition(col("Desc_Distrito"), col("Desc_Barrio")))

También podemos alamacenar los dataframes en __caché__.
Esto resulta muy util si vamos a ejecutar un comando repetidas veces, ya que hace que la primera ejecución sea más lenta, pero las demás __mucho más rápidas__.

In [11]:
# 6.9 Almacenar en cache
padron_part.cache()
padron_part.count()
padron_part.count()

237675

Se pueden usar las funciones típicas de sql como sum, avg, min, max... y agruparlas.

In [12]:
# 6.10 Obtener totales de españoles y extranjeros por barrio y distrito
(padron_part.select("Desc_Distrito", "Desc_Barrio",
                                 "EspanolesHombres", "EspanolesMujeres", "ExtranjerosHombres", "ExtranjerosMujeres")
 .groupBy("Desc_Distrito", "Desc_Barrio")
 .sum("EspanolesHombres", "EspanolesMujeres", "ExtranjerosHombres", "ExtranjerosMujeres")
 .withColumnRenamed("sum(EspanolesHombres)","Espanoles")
 .withColumnRenamed("sum(EspanolesMujeres)","Espanolas")
 .withColumnRenamed("sum(ExtranjerosHombres)","Extranjeros")
 .withColumnRenamed("sum(ExtranjerosMujeres)","Extranjeras")
 .show())

+-------------------+--------------------+---------+---------+-----------+-----------+
|      Desc_Distrito|         Desc_Barrio|Espanoles|Espanolas|Extranjeros|Extranjeras|
+-------------------+--------------------+---------+---------+-----------+-----------+
|FUENCARRAL-EL PARDO|          MIRASIERRA|    15846|    16938|        686|       1116|
|          SALAMANCA|          CASTELLANA|     6156|     7811|       1364|       1900|
|SAN BLAS-CANILLEJAS|         EL SALVADOR|     4942|     5456|        462|        553|
|    MONCLOA-ARAVACA|       CASA DE CAMPO|     5473|     6391|        657|        659|
|          HORTALEZA|        VALDEFUENTES|    28206|    28615|       2824|       3748|
|          MORATALAZ|          MARROQUINA|    11544|    13430|        844|       1049|
|             TETUAN|       BELLAS VISTAS|    10268|    12092|       3240|       4011|
|             CENTRO|            JUSTICIA|     7099|     6841|       2228|       2103|
|             CENTRO|         UNIVERSIDAD| 

In [13]:
# 6.11 Quitar cache
padron_part.unpersist()

DataFrame[COD_DISTRITO: int, DESC_DISTRITO: string, COD_DIST_BARRIO: int, DESC_BARRIO: string, COD_BARRIO: int, COD_DIST_SECCION: int, COD_SECCION: int, COD_EDAD_INT: int, EspanolesHombres: int, EspanolesMujeres: int, ExtranjerosHombres: int, ExtranjerosMujeres: int]

Se pueden hacer joins entre distintas tablas.

In [14]:
# 6.12 Crear un DataFrame que solo muestre Desc_Barrio, Desc_Distrito y el numero total de EspanolesHombres
padron_esho = (padron_df.select("Desc_Barrio", "Desc_Distrito", "EspanolesHombres")
              .groupBy("Desc_Barrio", "Desc_Distrito")
              .sum("EspanolesHombres")
              .withColumnRenamed("sum(EspanolesHombres)","Espanoles")
              .withColumnRenamed("Desc_Barrio","Descripcion_Barrio")
              .withColumnRenamed("Desc_Distrito","Descripcion_Distrito"))

(padron_esho.join(padron_df,
                  (padron_df.DESC_BARRIO == padron_esho.Descripcion_Barrio) &
                  (padron_df.DESC_DISTRITO == padron_esho.Descripcion_Distrito))
.select("Descripcion_Barrio", "Descripcion_Distrito", "Espanoles")
.distinct()
.show())

+------------------+--------------------+---------+
|Descripcion_Barrio|Descripcion_Distrito|Espanoles|
+------------------+--------------------+---------+
|           ACACIAS|          ARGANZUELA|    15438|
|          VALVERDE| FUENCARRAL-EL PARDO|    26976|
|    FUENTELARREINA| FUENCARRAL-EL PARDO|     1431|
|           PAVONES|           MORATALAZ|     3657|
|         EL GOLOSO| FUENCARRAL-EL PARDO|     8898|
|          CANILLAS|           HORTALEZA|    17029|
| SAN JUAN BAUTISTA|       CIUDAD LINEAL|     5239|
|       PROSPERIDAD|           CHAMARTIN|    14638|
|       COSTILLARES|       CIUDAD LINEAL|     9871|
|           LEGAZPI|          ARGANZUELA|     8942|
|       PEÑA GRANDE| FUENCARRAL-EL PARDO|    19103|
|      EL CAÑAVERAL|           VICALVARO|     1993|
|        CAMPAMENTO|              LATINA|     7187|
|        GAZTAMBIDE|            CHAMBERI|     8960|
|  PUERTA DEL ANGEL|              LATINA|    15250|
|        SAN ISIDRO|         CARABANCHEL|    14493|
|           

Spark tiene la opción de crear funciones de ventana.

In [15]:
# 6.13 Repetir usando funciones ventana
from pyspark.sql import functions as F

p = Window.partitionBy("Desc_Barrio", "Desc_Distrito")

padron_win = (padron_df
.withColumn("Total", F.sum("EspanolesHombres").over(p))
.select("Desc_Barrio", "Desc_Distrito", "Total")
.distinct())

padron_win.show()

+-----------------+-------------------+-----+
|      Desc_Barrio|      Desc_Distrito|Total|
+-----------------+-------------------+-----+
|          ACACIAS|         ARGANZUELA|15438|
|         VALVERDE|FUENCARRAL-EL PARDO|26976|
|   FUENTELARREINA|FUENCARRAL-EL PARDO| 1431|
|          PAVONES|          MORATALAZ| 3657|
|        EL GOLOSO|FUENCARRAL-EL PARDO| 8898|
|         CANILLAS|          HORTALEZA|17029|
|SAN JUAN BAUTISTA|      CIUDAD LINEAL| 5239|
|      PROSPERIDAD|          CHAMARTIN|14638|
|      COSTILLARES|      CIUDAD LINEAL| 9871|
|          LEGAZPI|         ARGANZUELA| 8942|
|      PEÑA GRANDE|FUENCARRAL-EL PARDO|19103|
|     EL CAÑAVERAL|          VICALVARO| 1993|
|       CAMPAMENTO|             LATINA| 7187|
|       GAZTAMBIDE|           CHAMBERI| 8960|
| PUERTA DEL ANGEL|             LATINA|15250|
|       SAN ISIDRO|        CARABANCHEL|14493|
|             GOYA|          SALAMANCA|10955|
|      UNIVERSIDAD|             CENTRO|12679|
|         NUMANCIA| PUENTE DE VALL

Con __pivot__ podemos pasar el valor de una fila como columna.

In [16]:
# Con pivot mostrar la media de mujeres españolas por barrio y rango de edad
# con "centro", "barajas" y "retiro" como columnas

padron_pivot = (padron_df.groupBy("Cod_Edad_Int")
               .pivot("Desc_Distrito", ["BARAJAS", "CENTRO", "RETIRO"])
               .avg("EspanolesMujeres")
               .orderBy("Cod_Edad_Int")
              .withColumnRenamed("BARAJAS","Barajas")
              .withColumnRenamed("CENTRO","Centro")
              .withColumnRenamed("RETIRO","Retiro"))

padron_pivot.show()

+------------+-----------------+------------------+------------------+
|Cod_Edad_Int|          Barajas|            Centro|            Retiro|
+------------+-----------------+------------------+------------------+
|           0|5.483870967741935|2.3545454545454545|3.4193548387096775|
|           1|5.774193548387097|2.3423423423423424|3.9361702127659575|
|           2|6.741935483870968|2.3394495412844036| 4.258064516129032|
|           3|7.580645161290323|2.2181818181818183| 4.531914893617022|
|           4|8.064516129032258| 2.238532110091743| 4.638297872340425|
|           5|8.225806451612904| 2.272727272727273| 4.585106382978723|
|           6|7.838709677419355|2.2818181818181817|  4.76595744680851|
|           7|8.709677419354838| 2.128440366972477| 4.585106382978723|
|           8|8.129032258064516| 2.390909090909091| 4.659574468085107|
|           9|8.580645161290322| 2.409090909090909| 4.537634408602151|
|          10|8.516129032258064|2.3423423423423424| 4.425531914893617|
|     

In [17]:
# 6.15 Crear 3 nuevas columnas que hagan referencia a qué % de EspanolesMujeres de cada rango de edad
# representa cada distrito. Redondea a 2 decimales
tot = col("Barajas") + col("Centro") + col("Retiro")
padron_percent = (padron_pivot
                  .withColumn("Barajas_Percent",
                             round(col("Barajas") / tot * 100, 2))
                  .withColumn("Centro_Percent",
                             round(col("Centro") / tot * 100, 2))
                  .withColumn("Retiro_Percent",
                             round(col("Retiro") / tot * 100, 2)))

padron_percent.show()

+------------+-----------------+------------------+------------------+---------------+--------------+--------------+
|Cod_Edad_Int|          Barajas|            Centro|            Retiro|Barajas_Percent|Centro_Percent|Retiro_Percent|
+------------+-----------------+------------------+------------------+---------------+--------------+--------------+
|           0|5.483870967741935|2.3545454545454545|3.4193548387096775|          48.71|         20.91|         30.37|
|           1|5.774193548387097|2.3423423423423424|3.9361702127659575|          47.91|         19.43|         32.66|
|           2|6.741935483870968|2.3394495412844036| 4.258064516129032|          50.54|         17.54|         31.92|
|           3|7.580645161290323|2.2181818181818183| 4.531914893617022|           52.9|         15.48|         31.62|
|           4|8.064516129032258| 2.238532110091743| 4.638297872340425|          53.97|         14.98|         31.04|
|           5|8.225806451612904| 2.272727272727273| 4.5851063829

Del mismo modo que hemos leído un archivo csv a dataframe, podemos hacer el proceso contrario y guardar un dataframe como csv o cualquier otro formato.

In [18]:
# 6.16 Guardar el archivo csv particionado por distrito y barrio en local
ruta = "Desktop/PadronCSV"
(padron_part.write
 .partitionBy("Desc_Distrito", "Desc_Barrio")
 .format("csv")
 .mode("overwrite")
 .save(ruta))