In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [4]:
spark = (SparkSession
    .builder
    .appName("Padron")
    .getOrCreate())

In [5]:
padronDF = spark.read.format("csv")\
.option("inferSchema", True)\
.option("header", True)\
.option("emptyValue", "0")\
.option("quote", "\"")\
.option("escape", "\"")\
.option("ignoreLeadingWhiteSpace", True)\
.option("ignoreTrailingWhiteSpace", True)\
.option("sep", ";")\
.option("encoding", "ISO-8859-1")\
.load("C:/Datasets/Padron.csv")

Los argumentos de importación que operan sobre los WhiteSpace no funcionan en este caso por que los campos están rodeados por comillas. Los dejamos para dar constancia de su existencia para cuando puedan ser de utilidad.

In [38]:
padronDF.dtypes

[('COD_DISTRITO', 'int'),
 ('DESC_DISTRITO', 'string'),
 ('COD_DIST_BARRIO', 'int'),
 ('DESC_BARRIO', 'string'),
 ('COD_BARRIO', 'int'),
 ('COD_DIST_SECCION', 'int'),
 ('COD_SECCION', 'int'),
 ('COD_EDAD_INT', 'int'),
 ('EspanolesHombres', 'int'),
 ('EspanolesMujeres', 'int'),
 ('ExtranjerosHombres', 'int'),
 ('ExtranjerosMujeres', 'int')]

Importa corectamente las Ñ

In [40]:
padronDF.select(F.col("DESC_BARRIO")).distinct().show(40)

+--------------------+
|         DESC_BARRIO|
+--------------------+
|EMBAJADORES         |
|SANTA EUGENIA       |
|LEGAZPI             |
|DELICIAS            |
|CIUDAD UNIVERSITARIA|
|CONCEPCION          |
|VENTAS              |
|IMPERIAL            |
|SAN FERMIN          |
|PEÑA GRANDE         |
|ESTRELLA            |
|TRAFALGAR           |
|ALUCHE              |
|PUERTA BONITA       |
|ADELFAS             |
|PUEBLO NUEVO        |
|PALOMAS             |
|CORRALEJOS          |
|LOS CARMENES        |
|VALDEZARZA          |
|PUERTA DEL ANGEL    |
|ORCASITAS           |
|ZOFIO               |
|SAN ISIDRO          |
|LOS JERONIMOS       |
|NIÑO JESUS          |
|ARCOS               |
|NUEVA ESPAÑA        |
|QUINTANA            |
|AEROPUERTO          |
|FUENTELARREINA      |
|VALLEHERMOSO        |
|CANILLAS            |
|PALACIO             |
|ALMENDRALES         |
|SAN CRISTOBAL       |
|CORTES              |
|CASTELLANA          |
|EL SALVADOR         |
|CHOPERA             |
+----------

Podemos guardarlo como parquet.

In [41]:
padronDF.write.parquet("C:/Users/luismiguel.dominguez/Desktop/Spark/Esritura/padron_parquet")

O pasarlo a una tabla temporal para tirar queries desde SQL.

In [42]:
padronDF.createOrReplaceTempView("padron")

In [53]:
vista = spark.sql("SELECT distinct(DESC_BARRIO, DESC_DISTRITO) from padron")

In [56]:
vista.count()

133

In [63]:
padronDF.select("DESC_DISTRITO").distinct().count()

21

In [64]:
padronDF.select("DESC_BARRIO").distinct().count()

132

Si lo comprobamos, el argumento de la importación del CSV para quitar espacios a la derecha no ha funcionado.

In [6]:
padronDF.select("DESC_DISTRITO", F.length("DESC_DISTRITO")).show()

+--------------------+---------------------+
|       DESC_DISTRITO|length(DESC_DISTRITO)|
+--------------------+---------------------+
|CENTRO              |                   20|
|CENTRO              |                   20|
|CENTRO              |                   20|
|CENTRO              |                   20|
|CENTRO              |                   20|
|CENTRO              |                   20|
|CENTRO              |                   20|
|CENTRO              |                   20|
|CENTRO              |                   20|
|CENTRO              |                   20|
|CENTRO              |                   20|
|CENTRO              |                   20|
|CENTRO              |                   20|
|CENTRO              |                   20|
|CENTRO              |                   20|
|CENTRO              |                   20|
|CENTRO              |                   20|
|CENTRO              |                   20|
|CENTRO              |                   20|
|CENTRO   

Los quitamos nosotros mismos con un rtrim.

In [7]:
padronDF_bueno = padronDF.withColumn("DESC_DISTRITO", F.rtrim("DESC_DISTRITO")).withColumn("DESC_BARRIO", F.rtrim("DESC_BARRIO"))

## Vamos a probar a particionar y esas cositas.

Podemos crear el dataframe particionado.

In [8]:
rep = padronDF_bueno.repartition(F.col("DESC_DISTRITO"), F.col("DESC_BARRIO"))
rep.createOrReplaceTempView("padron_part")


In [100]:
print(rep.rdd.getNumPartitions())

200


O guardarlo particionado en alguna dirección concreta del sistema.

In [65]:
padronDF.write.partitionBy("DESC_DISTRITO", "DESC_BARRIO").csv("C:/Users/luismiguel.dominguez/Desktop/Spark/Esritura/padron_particionado", header=True)

Lanzamos la query contra el DF particionado para comprobar que podemos hacer retrieve de los datos desde él.

In [99]:
rep.select("DESC_DISTRITO", "DESC_BARRIO", "espanoleshombres", "espanolesmujeres", "extranjeroshombres", "extranjerosmujeres").where(F.col("DESC_DISTRITO").isin ("CENTRO", "LATINA", "CHAMARTIN", "TETUAN", "VICALVARO", "BARAJAS")
).groupBy(F.col("DESC_DISTRITO"), F.col("DESC_BARRIO")).agg(F.sum(F.col("espanoleshombres")), F.sum(F.col("espanolesmujeres")), F.sum(F.col("extranjeroshombres")), F.sum(F.col("extranjerosmujeres"))).show()

+-------------+--------------------+---------------------+---------------------+-----------------------+-----------------------+
|DESC_DISTRITO|         DESC_BARRIO|sum(espanoleshombres)|sum(espanolesmujeres)|sum(extranjeroshombres)|sum(extranjerosmujeres)|
+-------------+--------------------+---------------------+---------------------+-----------------------+-----------------------+
|       TETUAN|       BELLAS VISTAS|                10268|                12092|                   3240|                   4011|
|       CENTRO|            JUSTICIA|                 7099|                 6841|                   2228|                   2103|
|       CENTRO|         UNIVERSIDAD|                12679|                12755|                   4083|                   4078|
|       TETUAN|BARRIOS EN EDIF. BDC|                    0|                    1|                      0|                      0|
|       TETUAN|          BERRUGUETE|                 8775|                10529|                 

No está de más comprobar que el número de resultados para la misma query en el archivo particionado o sin particionar es del mismo tamaño.

In [111]:
rep.select("DESC_DISTRITO", "DESC_BARRIO", "espanoleshombres", "espanolesmujeres", "extranjeroshombres", "extranjerosmujeres").where(F.col("DESC_DISTRITO").isin ("CENTRO", "LATINA", "CHAMARTIN", "TETUAN", "VICALVARO", "BARAJAS")
).groupBy(F.col("DESC_DISTRITO"), F.col("DESC_BARRIO")).agg(F.sum(F.col("espanoleshombres")), F.sum(F.col("espanolesmujeres")), F.sum(F.col("extranjeroshombres")), F.sum(F.col("extranjerosmujeres"))).count()

36

In [113]:
padronDF_bueno.select("DESC_DISTRITO", "DESC_BARRIO", "espanoleshombres", "espanolesmujeres", "extranjeroshombres", "extranjerosmujeres").where(F.col("DESC_DISTRITO").isin ("CENTRO", "LATINA", "CHAMARTIN", "TETUAN", "VICALVARO", "BARAJAS")
).groupBy(F.col("DESC_DISTRITO"), F.col("DESC_BARRIO")).agg(F.sum(F.col("espanoleshombres")), F.sum(F.col("espanolesmujeres")), F.sum(F.col("extranjeroshombres")), F.sum(F.col("extranjerosmujeres"))).count()

36

Como ahora vamos a hacer pruebas de velocidad al consutar vamos a guardar los dos dataframes en caché. Hacemos el cout para invocar una accion posterior y así generar el caché.

In [9]:
rep.cache().count()

237675

In [10]:
padronDF_bueno.cache().count()

237675

In [11]:
rep.select("DESC_DISTRITO", "DESC_BARRIO", "espanoleshombres", "espanolesmujeres", "extranjeroshombres", "extranjerosmujeres").where(F.col("DESC_DISTRITO").isin ("CENTRO", "LATINA", "CHAMARTIN", "TETUAN", "VICALVARO", "BARAJAS")
).groupBy(F.col("DESC_DISTRITO"), F.col("DESC_BARRIO")).agg(F.sum(F.col("espanoleshombres")), F.sum(F.col("espanolesmujeres")), F.sum(F.col("extranjeroshombres")), F.sum(F.col("extranjerosmujeres"))).show()

+-------------+--------------------+---------------------+---------------------+-----------------------+-----------------------+
|DESC_DISTRITO|         DESC_BARRIO|sum(espanoleshombres)|sum(espanolesmujeres)|sum(extranjeroshombres)|sum(extranjerosmujeres)|
+-------------+--------------------+---------------------+---------------------+-----------------------+-----------------------+
|       TETUAN|       BELLAS VISTAS|                10268|                12092|                   3240|                   4011|
|       CENTRO|            JUSTICIA|                 7099|                 6841|                   2228|                   2103|
|       CENTRO|         UNIVERSIDAD|                12679|                12755|                   4083|                   4078|
|       TETUAN|BARRIOS EN EDIF. BDC|                    0|                    1|                      0|                      0|
|       TETUAN|          BERRUGUETE|                 8775|                10529|                 

In [12]:
padronDF_bueno.select("DESC_DISTRITO", "DESC_BARRIO", "espanoleshombres", "espanolesmujeres", "extranjeroshombres", "extranjerosmujeres").where(F.col("DESC_DISTRITO").isin ("CENTRO", "LATINA", "CHAMARTIN", "TETUAN", "VICALVARO", "BARAJAS")
).groupBy(F.col("DESC_DISTRITO"), F.col("DESC_BARRIO")).agg(F.sum(F.col("espanoleshombres")), F.sum(F.col("espanolesmujeres")), F.sum(F.col("extranjeroshombres")), F.sum(F.col("extranjerosmujeres"))).show()

+-------------+--------------------+---------------------+---------------------+-----------------------+-----------------------+
|DESC_DISTRITO|         DESC_BARRIO|sum(espanoleshombres)|sum(espanolesmujeres)|sum(extranjeroshombres)|sum(extranjerosmujeres)|
+-------------+--------------------+---------------------+---------------------+-----------------------+-----------------------+
|       TETUAN|       BELLAS VISTAS|                10268|                12092|                   3240|                   4011|
|       CENTRO|            JUSTICIA|                 7099|                 6841|                   2228|                   2103|
|       CENTRO|         UNIVERSIDAD|                12679|                12755|                   4083|                   4078|
|       TETUAN|BARRIOS EN EDIF. BDC|                    0|                    1|                      0|                      0|
|       TETUAN|          BERRUGUETE|                 8775|                10529|                 

No hay gran ganancia a la hora de ejecutar dsede particionado o no. Los tiempos se pueden ver en la UI de Spark.