Comenzamos importando las librerías. Es importante observar que se están utilizando alias en los paquetes para referenciar y desambiguar los nombres de las funciones que deriven de ellos.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

Creamos la SparkSession con el master en el sistema local.

In [4]:
spark = (SparkSession \
         .builder \
         .master("local") \
         .enableHiveSupport() \
         .appName("Padron_Python") \
         .getOrCreate())

Importamos el set de datos desde una ruta local, declarando las comillas que envuelven los campos, que nos sustituya los campos vacíos por 0, que infiera el esquema y que considere la primera línea como header.

In [5]:
padronDF = spark.read.format("csv") \
                     .option("inferSchema", True) \
					 .option("header", True) \
					 .option("emptyValue", "0") \
					 .option("quote", "\"") \
					 .option("escape", "\"") \
					 .option("sep", ";") \
					 .option("encoding", "ISO-8859-1") \
					 .load("C:/Users/luismiguel.dominguez/Escritorio/TFM/Dataset/padron1.txt")

Mostramos los tipos para ver que todo se ha importado correctamente.

In [6]:
padronDF.dtypes

[('COD_DISTRITO', 'int'),
 ('DESC_DISTRITO', 'string'),
 ('COD_DIST_BARRIO', 'int'),
 ('DESC_BARRIO', 'string'),
 ('COD_BARRIO', 'int'),
 ('COD_DIST_SECCION', 'int'),
 ('COD_SECCION', 'int'),
 ('COD_EDAD_INT', 'int'),
 ('EspanolesHombres', 'int'),
 ('EspanolesMujeres', 'int'),
 ('ExtranjerosHombres', 'int'),
 ('ExtranjerosMujeres', 'int')]

Comprobamos el número de barrios y de distritos diferentes que hay en nuestro dataset.

In [7]:
padronDF.select(F.col("DESC_BARRIO")).distinct().show(40)
padronDF.select(F.col("DESC_BARRIO")).distinct().count()

+--------------------+
|         DESC_BARRIO|
+--------------------+
|EMBAJADORES         |
|SANTA EUGENIA       |
|LEGAZPI             |
|DELICIAS            |
|CIUDAD UNIVERSITARIA|
|CONCEPCION          |
|VENTAS              |
|IMPERIAL            |
|SAN FERMIN          |
|PEÑA GRANDE         |
|ESTRELLA            |
|TRAFALGAR           |
|ALUCHE              |
|PUERTA BONITA       |
|ADELFAS             |
|PUEBLO NUEVO        |
|PALOMAS             |
|CORRALEJOS          |
|LOS CARMENES        |
|VALDEZARZA          |
|PUERTA DEL ANGEL    |
|ORCASITAS           |
|ZOFIO               |
|SAN ISIDRO          |
|LOS JERONIMOS       |
|NIÑO JESUS          |
|ARCOS               |
|NUEVA ESPAÑA        |
|QUINTANA            |
|AEROPUERTO          |
|FUENTELARREINA      |
|VALLEHERMOSO        |
|CANILLAS            |
|PALACIO             |
|ALMENDRALES         |
|SAN CRISTOBAL       |
|CORTES              |
|CASTELLANA          |
|EL SALVADOR         |
|CHOPERA             |
+----------

132

In [8]:
padronDF.select(F.col("DESC_DISTRITO")).distinct().show(40)
padronDF.select(F.col("DESC_DISTRITO")).distinct().count()

+--------------------+
|       DESC_DISTRITO|
+--------------------+
|VILLA DE VALLECAS   |
|SALAMANCA           |
|CHAMBERI            |
|CHAMARTIN           |
|TETUAN              |
|VILLAVERDE          |
|MORATALAZ           |
|CENTRO              |
|USERA               |
|VICALVARO           |
|HORTALEZA           |
|CIUDAD LINEAL       |
|FUENCARRAL-EL PARDO |
|MONCLOA-ARAVACA     |
|SAN BLAS-CANILLEJAS |
|RETIRO              |
|LATINA              |
|BARAJAS             |
|ARGANZUELA          |
|CARABANCHEL         |
|PUENTE DE VALLECAS  |
+--------------------+



21

Como podemos ver, aún sobran espacios en blanco tras los campos tipo string.

In [9]:
padronDF.select(F.col("DESC_DISTRITO"), F.length("DESC_DISTRITO")).distinct().show()

+--------------------+---------------------+
|       DESC_DISTRITO|length(DESC_DISTRITO)|
+--------------------+---------------------+
|MONCLOA-ARAVACA     |                   20|
|CARABANCHEL         |                   20|
|ARGANZUELA          |                   20|
|CHAMARTIN           |                   20|
|BARAJAS             |                   20|
|TETUAN              |                   20|
|VILLAVERDE          |                   20|
|CHAMBERI            |                   20|
|LATINA              |                   20|
|USERA               |                   20|
|CENTRO              |                   20|
|HORTALEZA           |                   20|
|VICALVARO           |                   20|
|MORATALAZ           |                   20|
|CIUDAD LINEAL       |                   20|
|SALAMANCA           |                   20|
|FUENCARRAL-EL PARDO |                   20|
|VILLA DE VALLECAS   |                   20|
|RETIRO              |                   20|
|PUENTE DE

Los quitamos con la función TRIM y mostramos que el resultado es el adecuado.

In [10]:
padronDF_bueno = padronDF.withColumn("DESC_DISTRITO", F.rtrim("DESC_DISTRITO")) \
                         .withColumn("DESC_BARRIO", F.rtrim("DESC_BARRIO"))

padronDF_bueno.select(F.col("DESC_DISTRITO"), F.length("DESC_DISTRITO")).distinct().show()

+-------------------+---------------------+
|      DESC_DISTRITO|length(DESC_DISTRITO)|
+-------------------+---------------------+
|         ARGANZUELA|                   10|
|FUENCARRAL-EL PARDO|                   19|
|              USERA|                    5|
|          SALAMANCA|                    9|
| PUENTE DE VALLECAS|                   18|
|  VILLA DE VALLECAS|                   17|
|           CHAMBERI|                    8|
|          VICALVARO|                    9|
|             RETIRO|                    6|
|             CENTRO|                    6|
|SAN BLAS-CANILLEJAS|                   19|
|          CHAMARTIN|                    9|
|             LATINA|                    6|
|          MORATALAZ|                    9|
|            BARAJAS|                    7|
|             TETUAN|                    6|
|      CIUDAD LINEAL|                   13|
|          HORTALEZA|                    9|
|         VILLAVERDE|                   10|
|        CARABANCHEL|           

Lanzamos una query compleja similar a la que lanzamos en la shell para comprobar que podemos usar gran variedad de funciones sin problema. Guardamos este DataFrame en caché para que la siguiente consulta que lo utilice sea más rápida.

In [11]:
cont = padronDF_bueno.select(F.col("COD_EDAD_INT"), F.col("DESC_DISTRITO"),F.col("espanolesmujeres")) \
                     .where(F.col("DESC_DISTRITO").isin("CENTRO", "BARAJAS", "RETIRO")) \
                     .groupBy(F.col("COD_EDAD_INT"))\ 
                     .pivot("DESC_DISTRITO").sum("espanolesmujeres") \
                     .orderBy("COD_EDAD_INT")

cont.cache().show()

+------------+-------+------+------+
|COD_EDAD_INT|BARAJAS|CENTRO|RETIRO|
+------------+-------+------+------+
|           0|    170|   259|   318|
|           1|    179|   260|   370|
|           2|    209|   255|   396|
|           3|    235|   244|   426|
|           4|    250|   244|   436|
|           5|    255|   250|   431|
|           6|    243|   251|   448|
|           7|    270|   232|   431|
|           8|    252|   263|   438|
|           9|    266|   265|   422|
|          10|    264|   260|   416|
|          11|    263|   260|   437|
|          12|    285|   259|   427|
|          13|    252|   250|   398|
|          14|    282|   269|   431|
|          15|    263|   247|   423|
|          16|    244|   258|   412|
|          17|    257|   279|   443|
|          18|    213|   256|   413|
|          19|    207|   265|   430|
+------------+-------+------+------+
only showing top 20 rows



Calculamos los porcentajes.

In [12]:
cont.select("*").withColumn("Suma", (F.col("BARAJAS") + F.col("CENTRO")+F.col("RETIRO"))) \
                .withColumn("BARAJAS %", F.round(F.col("BARAJAS")/F.col("Suma")*100,2)) \
                .withColumn("CENTRO %", F.round(F.col("centro")/F.col("Suma")*100,2)) \
                .withColumn("RETIRO %", F.round(F.col("retiro")/F.col("Suma")*100,2)).show()

+------------+-------+------+------+----+---------+--------+--------+
|COD_EDAD_INT|BARAJAS|CENTRO|RETIRO|Suma|BARAJAS %|CENTRO %|RETIRO %|
+------------+-------+------+------+----+---------+--------+--------+
|           0|    170|   259|   318| 747|    22.76|   34.67|   42.57|
|           1|    179|   260|   370| 809|    22.13|   32.14|   45.74|
|           2|    209|   255|   396| 860|     24.3|   29.65|   46.05|
|           3|    235|   244|   426| 905|    25.97|   26.96|   47.07|
|           4|    250|   244|   436| 930|    26.88|   26.24|   46.88|
|           5|    255|   250|   431| 936|    27.24|   26.71|   46.05|
|           6|    243|   251|   448| 942|     25.8|   26.65|   47.56|
|           7|    270|   232|   431| 933|    28.94|   24.87|    46.2|
|           8|    252|   263|   438| 953|    26.44|    27.6|   45.96|
|           9|    266|   265|   422| 953|    27.91|   27.81|   44.28|
|          10|    264|   260|   416| 940|    28.09|   27.66|   44.26|
|          11|    26

Guardamos el resultado de la query en una variable y la escribimos en formato parquet en nuestro sistema de ficheros local.

In [15]:
escritura_DF = cont.select("*").withColumn("Suma", (F.col("BARAJAS") + F.col("CENTRO")+F.col("RETIRO"))) \
                .withColumn("BARAJAS %", F.round(F.col("BARAJAS")/F.col("Suma")*100,2)) \
                .withColumn("CENTRO %", F.round(F.col("centro")/F.col("Suma")*100,2)) \
                .withColumn("RETIRO %", F.round(F.col("retiro")/F.col("Suma")*100,2)).show()


escritura_DF.write.parquet("C:/Users/luismiguel.dominguez/Escritorio/TFM/Dataset/output/padron_output.txt")