In [83]:
# Importamos librerias
import findspark
findspark.init()
import pyspark
from pyspark.sql import Window
from pyspark.sql.functions import lit, trim, col, length, round

# Creamos sesion
from pyspark.sql import SparkSession
spark = (SparkSession
        .builder
        .appName("Padron")
        .getOrCreate())

In [84]:
# Cargamos el csv con la estructura correcta, eliminando "", sustituyendo "" como nulos por 0 y quitando espacios innecesarios
file = 'padron.csv'

padron = (spark.read.format('csv')
         .option('header', 'true')
         .option('inferSchema', 'true')
         .option('delimiter', ';')
         .option('quotes', '\"')
         .option('emptyValue', 0)
         .load(file))
# Para eliminar los espacios en blanco innecesarios:
padron_df = padron.select([(trim(i[0])).alias(i[0]) if i[1] == "string" else i[0] for i in padron.select("*").dtypes])


padron_df.show(10)

+------------+-------------+---------------+-----------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+
|COD_DISTRITO|DESC_DISTRITO|COD_DIST_BARRIO|DESC_BARRIO|COD_BARRIO|COD_DIST_SECCION|COD_SECCION|COD_EDAD_INT|EspanolesHombres|EspanolesMujeres|ExtranjerosHombres|ExtranjerosMujeres|
+------------+-------------+---------------+-----------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+
|           1|       CENTRO|            101|    PALACIO|         1|            1001|          1|           0|               2|               3|                 1|                 0|
|           1|       CENTRO|            101|    PALACIO|         1|            1001|          1|           1|               7|               0|                 1|                 0|
|           1|       CENTRO|            101|    PALACIO|         1|            1001|      

In [85]:
# Enumerar los distintos barrios
neighborhood = (padron_df.select('DESC_BARRIO')
               .distinct())
neighborhood.show()

+--------------------+
|         DESC_BARRIO|
+--------------------+
|        VALDEFUENTES|
|            ABRANTES|
|       LOS JERONIMOS|
|          NI�O JESUS|
|            VALVERDE|
|              CORTES|
|   PALOMERAS SURESTE|
|CIUDAD UNIVERSITARIA|
|      CUATRO VIENTOS|
|           TRAFALGAR|
|              HELLIN|
|    ALAMEDA DE OSUNA|
|          PRADOLONGO|
|            MOSCARDO|
|          VALDEZARZA|
|           RECOLETOS|
|             HORCAJO|
|        VISTA ALEGRE|
|             EL VISO|
|    PUERTA DEL ANGEL|
+--------------------+
only showing top 20 rows



In [86]:
# Contar a traves de las views, el número de barrios
padron_df.createOrReplaceTempView('padron')

spark.sql('select count(distinct DESC_BARRIO) AS Total_Barrios FROM padron').show()

+-------------+
|Total_Barrios|
+-------------+
|          131|
+-------------+



In [87]:
# Crea una nueva columna que muestre la longitud de los campos de la columna DESC_DISTRITO y que se llame "longitud"
padron_lon = (padron_df
                 .withColumn("Longitud", length(col("DESC_DISTRITO")))
                 .select("Desc_Distrito", "Longitud")
                 .distinct()
                 .show())

+-------------------+--------+
|      Desc_Distrito|Longitud|
+-------------------+--------+
|         ARGANZUELA|      10|
|FUENCARRAL-EL PARDO|      19|
|              USERA|       5|
|          SALAMANCA|       9|
| PUENTE DE VALLECAS|      18|
|  VILLA DE VALLECAS|      17|
|           CHAMBERI|       8|
|          VICALVARO|       9|
|             RETIRO|       6|
|             CENTRO|       6|
|SAN BLAS-CANILLEJAS|      19|
|          CHAMARTIN|       9|
|             LATINA|       6|
|          MORATALAZ|       9|
|            BARAJAS|       7|
|             TETUAN|       6|
|      CIUDAD LINEAL|      13|
|          HORTALEZA|       9|
|         VILLAVERDE|      10|
|        CARABANCHEL|      11|
+-------------------+--------+
only showing top 20 rows



In [88]:
# Crear una columna que muestre el valor 5 para cada uno de los registro de la tabla
padron5 = (padron_df
          .withColumn('Cinco', lit('5'))
          .select('DESC_DISTRITO', 'Cinco')
          .distinct())
padron5.show()

+-------------------+-----+
|      DESC_DISTRITO|Cinco|
+-------------------+-----+
|          VICALVARO|    5|
|             TETUAN|    5|
|           CHAMBERI|    5|
|             RETIRO|    5|
|  VILLA DE VALLECAS|    5|
|         VILLAVERDE|    5|
|        CARABANCHEL|    5|
|              USERA|    5|
|            BARAJAS|    5|
|          CHAMARTIN|    5|
|    MONCLOA-ARAVACA|    5|
|      CIUDAD LINEAL|    5|
|          MORATALAZ|    5|
|         ARGANZUELA|    5|
|          SALAMANCA|    5|
|          HORTALEZA|    5|
|SAN BLAS-CANILLEJAS|    5|
|             CENTRO|    5|
|FUENCARRAL-EL PARDO|    5|
| PUENTE DE VALLECAS|    5|
+-------------------+-----+
only showing top 20 rows



In [89]:
#Borra esta columna
drop_cinco = padron5.drop('Cinco')
drop_cinco.show()

+-------------------+
|      DESC_DISTRITO|
+-------------------+
|          VICALVARO|
|             TETUAN|
|           CHAMBERI|
|             RETIRO|
|  VILLA DE VALLECAS|
|         VILLAVERDE|
|        CARABANCHEL|
|              USERA|
|            BARAJAS|
|          CHAMARTIN|
|    MONCLOA-ARAVACA|
|      CIUDAD LINEAL|
|          MORATALAZ|
|         ARGANZUELA|
|          SALAMANCA|
|          HORTALEZA|
|SAN BLAS-CANILLEJAS|
|             CENTRO|
|FUENCARRAL-EL PARDO|
| PUENTE DE VALLECAS|
+-------------------+
only showing top 20 rows



In [90]:
# Particiona el dataframe por las variables DESC_DISTRITO y DESC_BARRIO
partition = (padron_df
            .repartition(col('DESC_BARRIO'), col('DESC_DISTRITO')))

In [75]:
#Almacénalo en caché. Consulta en el puerto 4040 (UI de Spark) de tu usuario local el estado
# de los rdds almacenados.
partition.cache()
partition.count()
partition.count()

238196

In [76]:
# Lanza una consulta contra el DF resultante en la que muestre el número total de 
#"espanoleshombres", "espanolesmujeres", extranjeroshombres" y "extranjerosmujeres" 
#para cada barrio de cada distrito. Las columnas distrito y barrio deben ser las primeras en 
#aparecer en el show. Los resultados deben estar ordenados en orden de más a menos 
#según la columna "extranjerosmujeres" y desempatarán por la columna 
#"extranjeroshombres".
(partition.select('DESC_DISTRITO', 'DESC_BARRIO', 'EspanolesHombres', 'EspanolesMujeres', 'ExtranjerosHombres', 'ExtranjerosMujeres')
         .groupBy('DESC_DISTRITO', 'DESC_BARRIO')
         .sum('EspanolesHombres', 'EspanolesMujeres', 'ExtranjerosHombres', 'ExtranjerosMujeres')
         .withColumnRenamed('sum(EspanolesHombres)', 'Total Españoles')
         .withColumnRenamed('sum(EspanolesMujeres)', 'Total Españolas')
         .withColumnRenamed('sum(ExtranjerosHombres)', 'Total Extranjeros H')
         .withColumnRenamed('sum(ExtranjerosMujeres)', 'Total Extranjeras F')
         .show())
        

+-------------------+-----------------+---------------+---------------+-------------------+-------------------+
|      DESC_DISTRITO|      DESC_BARRIO|Total Españoles|Total Españolas|Total Extranjeros H|Total Extranjeras F|
+-------------------+-----------------+---------------+---------------+-------------------+-------------------+
|         ARGANZUELA|          ACACIAS|          15399|          18073|               1355|               1507|
|FUENCARRAL-EL PARDO|         VALVERDE|          26922|          29105|               3675|               4441|
|FUENCARRAL-EL PARDO|   FUENTELARREINA|           1445|           1679|                 72|                171|
|          MORATALAZ|          PAVONES|           3584|           4321|                419|                478|
|FUENCARRAL-EL PARDO|        EL GOLOSO|           8947|           9076|                551|                630|
|          HORTALEZA|         CANILLAS|          16770|          19155|               1764|             

In [77]:
# Eliminar el registro caché
partition.unpersist()

DataFrame[COD_DISTRITO: int, DESC_DISTRITO: string, COD_DIST_BARRIO: int, DESC_BARRIO: string, COD_BARRIO: int, COD_DIST_SECCION: int, COD_SECCION: int, COD_EDAD_INT: int, EspanolesHombres: int, EspanolesMujeres: int, ExtranjerosHombres: int, ExtranjerosMujeres: int]

In [78]:
#Crea un nuevo DataFrame a partir del original que muestre únicamente una columna con 
#DESC_BARRIO, otra con DESC_DISTRITO y otra con el número total de "espanoleshombres" 
#residentes en cada distrito de cada barrio. Únelo (con un join) con el DataFrame original a 
#través de las columnas en común.

espanoles = (padron_df.select('DESC_BARRIO', 'DESC_DISTRITO', 'EspanolesHombres')
                     .groupBy('DESC_BARRIO', 'DESC_DISTRITO')
                     .sum('EspanolesHombres')
                     .withColumnRenamed('sum(EspanolesHombres)', 'Españoles')
                     .withColumnRenamed('DESC_BARRIO', 'BARRIO')
                     .withColumnRenamed('DESC_DISTRITO', 'DISTRITO'))
        
(espanoles.join(padron_df,
               (padron_df.DESC_BARRIO == espanoles.BARRIO) & 
               (padron_df.DESC_DISTRITO == espanoles.DISTRITO))
.select('DISTRITO', 'BARRIO', 'Españoles')
.distinct()
.show())


+-------------------+-----------------+---------+
|           DISTRITO|           BARRIO|Españoles|
+-------------------+-----------------+---------+
|         ARGANZUELA|          ACACIAS|    15399|
|FUENCARRAL-EL PARDO|         VALVERDE|    26922|
|FUENCARRAL-EL PARDO|   FUENTELARREINA|     1445|
|          MORATALAZ|          PAVONES|     3584|
|FUENCARRAL-EL PARDO|        EL GOLOSO|     8947|
|          HORTALEZA|         CANILLAS|    16770|
|      CIUDAD LINEAL|SAN JUAN BAUTISTA|     5182|
|          CHAMARTIN|      PROSPERIDAD|    14490|
|      CIUDAD LINEAL|      COSTILLARES|     9778|
|         ARGANZUELA|          LEGAZPI|     8897|
|             LATINA|       CAMPAMENTO|     7102|
|           CHAMBERI|       GAZTAMBIDE|     8921|
|             LATINA| PUERTA DEL ANGEL|    15120|
|        CARABANCHEL|       SAN ISIDRO|    14537|
|          SALAMANCA|             GOYA|    10913|
|             CENTRO|      UNIVERSIDAD|    12410|
| PUENTE DE VALLECAS|         NUMANCIA|    17151|


In [79]:
# Repite la función anterior utilizando funciones de ventana. (over(Window.partitionBy.....)).
from pyspark.sql import functions as F

w = Window.partitionBy('DESC_DISTRITO', 'DESC_BARRIO')

padron_w = (padron_df
           .withColumn('Total Españoles Hombres', F.sum('EspanolesHombres').over(w))
           .select('DESC_BARRIO', 'DESC_DISTRITO', 'Total Españoles Hombres')
           .distinct())

padron_w.show()


+-----------------+-------------------+-----------------------+
|      DESC_BARRIO|      DESC_DISTRITO|Total Españoles Hombres|
+-----------------+-------------------+-----------------------+
|       MIRASIERRA|FUENCARRAL-EL PARDO|                  16230|
|       CASTELLANA|          SALAMANCA|                   6106|
|      EL SALVADOR|SAN BLAS-CANILLEJAS|                   4867|
|     VALDEFUENTES|          HORTALEZA|                  28743|
|    CASA DE CAMPO|    MONCLOA-ARAVACA|                   5421|
|       MARROQUINA|          MORATALAZ|                  11357|
|    BELLAS VISTAS|             TETUAN|                  10193|
|     EL CA�AVERAL|          VICALVARO|                   3934|
|         JUSTICIA|             CENTRO|                   7048|
|      UNIVERSIDAD|             CENTRO|                  12410|
|          ATALAYA|      CIUDAD LINEAL|                    607|
|       BERRUGUETE|             TETUAN|                   8628|
|CASCO H.VICALVARO|          VICALVARO| 

In [80]:
# Mediante una función Pivot muestra una tabla (que va a ser una tabla de contingencia) que
#contenga los valores totales ()la suma de valores (avg) de espanolesmujeres para cada distrito y 
#en cada rango de edad (COD_EDAD_INT). Los distritos incluidos deben ser únicamente 
#CENTRO, BARAJAS y RETIRO y deben figurar como columnas.

pivot_df = (padron_df.groupBy('COD_EDAD_INT').pivot('DESC_DISTRITO', ['CENTRO', 'RETIRO', 'BARAJAS'])
           .avg('EspanolesMujeres')
           .orderBy('COD_EDAD_INT'))

pivot_df.show()

+------------+------------------+------------------+-----------------+
|COD_EDAD_INT|            CENTRO|            RETIRO|          BARAJAS|
+------------+------------------+------------------+-----------------+
|           0|2.1621621621621623| 3.161290322580645|4.866666666666666|
|           1| 2.209090909090909|3.6808510638297873|6.419354838709677|
|           2| 2.009009009009009| 3.648936170212766|5.806451612903226|
|           3| 2.081818181818182| 4.118279569892473|6.580645161290323|
|           4| 2.054054054054054| 4.457446808510638|7.451612903225806|
|           5| 2.081081081081081|               4.5|7.838709677419355|
|           6|2.3363636363636364| 4.542553191489362|8.290322580645162|
|           7| 2.190909090909091| 4.627659574468085|7.870967741935484|
|           8|2.0458715596330275| 4.553191489361702| 8.64516129032258|
|           9|2.3518518518518516| 4.574468085106383|7.903225806451613|
|          10|2.2844036697247705| 4.494623655913978| 8.67741935483871|
|     

In [81]:
#Utilizando este nuevo DF, crea 3 columnas nuevas que hagan referencia a qué porcentaje 
#de la suma de "espanolesmujeres" en los tres distritos para cada rango de edad representa 
#cada uno de los tres distritos. Debe estar redondeada a 2 decimales. Puedes imponerte la 
#condición extra de no apoyarte en ninguna columna auxiliar creada para el caso.

porcentaje = col('BARAJAS') + col('CENTRO') + col('RETIRO')

padron_p = (pivot_df
                    .withColumn('Barajas Porcentaje', round(col('BARAJAS')/ porcentaje*100, 2))
                    .withColumn('Centro Porcentaje', round(col('CENTRO')/ porcentaje*100, 2))
                    .withColumn('Retiro Porcentaje', round(col('RETIRO')/ porcentaje*100, 2))
                    )
                                
padron_p.show()

+------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+
|COD_EDAD_INT|            CENTRO|            RETIRO|          BARAJAS|Barajas Porcentaje|Centro Porcentaje|Retiro Porcentaje|
+------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+
|           0|2.1621621621621623| 3.161290322580645|4.866666666666666|             47.76|            21.22|            31.02|
|           1| 2.209090909090909|3.6808510638297873|6.419354838709677|             52.15|            17.95|             29.9|
|           2| 2.009009009009009| 3.648936170212766|5.806451612903226|             50.65|            17.52|            31.83|
|           3| 2.081818181818182| 4.118279569892473|6.580645161290323|             51.49|            16.29|            32.22|
|           4| 2.054054054054054| 4.457446808510638|7.451612903225806|             53.37|            14.71|           

Py4JJavaError: An error occurred while calling o813.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:645)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1230)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1435)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:493)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:678)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:332)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:402)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:375)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:182)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:220)
	... 32 more


In [None]:
# Guarda el archivo csv original particionado por distrito y por barrio en un directorio local. Consulta el directorio
# para ver la estructura de los ficheros y comprueba que es la esperada
file = 'padronSpark/padronCsv'
(partition.write
    .partitionBy('DESC_DISTRITO', 'DESC_BARRIO')
    .format('csv')
    .mode('overwrite')
    .save(file))

In [None]:
# En parquet
file = 'padronSpark/padronParquet'
(partition.write
    .partitionBy('DESC_DISTRITO', 'DESC_BARRIO')
    .parquet(file))