<a href="https://colab.research.google.com/github/arredocana/data-analysis/blob/main/An%C3%A1lisis_Padr%C3%B3n_Madrid_usando_PySpark_y_Hive.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Análisis del Padrón Municipal de Madrid

La población de un municipio se define como el conjunto de personas inscritas en su Padrón municipal.

Toda persona que viva en España está obligada a inscribirse en el Padrón del municipio en el que resida habitualmente, inscribiéndose en el que habite durante más tiempo al año si vive en varios municipios. 

El Padrón Municipal de Habitantes es el registro administrativo donde constan los habitantes de un municipio, siendo sus datos prueba de la residencia en el municipio y del domicilio habitual.

En el [Portal de datos abiertos del Ayuntamiento de Madrid](https://datos.madrid.es/portal/site/egob/menuitem.c05c1f754a33a9fbe4b2e4b284f1a5a0/?vgnextoid=1d755cde99be2410VgnVCM1000000b205a0aRCRD&vgnextchannel=374512b9ace9f310VgnVCM100000171f5a0aRCRD&vgnextfmt=default) podemos encontrar información del Padrón municipal, detallado a nivel de distrito, barrio y sección censal, y agregado por sexo y edad.

Con esta información, vamos a tratar de responder a las siguientes cuestiones:

1. ¿Cuántos barrios tiene cada distrito?
2. ¿Cuál es el porcentaje de españoles y extranjeros por distrito y barrio?
3. ¿Cuál es la edad media de los ciudadadanos en cada barrio?

Posteriormente practicaremos PySpark realizando más ejercicios.

## Configuración del entorno

Para trabajar con tu notebook en Colaboratory puedes conectarte a un entorno de ejecución **local** o en la **nube**.

En ambos casos, el notebook se guardará en tu cuenta de Google Drive en una carpeta nueva llamada **Colab Notebooks**.

No obstante, puedes descargarte el notebook a tu equipo local en formato .ipynb o .py desde la opción Archivo.

### Local

Si decides trabajar con tu equipo local necesitarás conectarte a un servidor de **Jupyter Notebook**. Los pasos a seguir serían los siguientes:

1. Instalar [Jupyter](http://jupyter.org/install) en tu equipo local.

2. Instala y habilitar la extensión **jupyter_http_over_ws**: 

  `pip install jupyter_http_over_ws jupyter serverextension enable --py jupyter_http_over_ws`

3. Inicia el servidor y autenticarse:

  Abrir consola y ejecutar el siguiente comando:

  `jupyter notebook --NotebookApp.allow_origin='https://colab.research.google.com'--port=8888 --NotebookApp.port_retries=0 `

  Cuando se inicie el servidor, se mostrará un mensaje con la URL de backend inicial utilizada para la autenticación. Copia esta URL. 
  
  Ejemplo: http://localhost:8888/?token=b5caf18c5137318b8914e6881d5036e55867ef06490919dd

4. Conéctate al entorno de ejecución local:

  En Colaboratory, haz clic en el botón Conectar y selecciona Conectar a un entorno de ejecución local. 
  
  Introduce la URL del paso anterior en el cuadro de diálogo que aparece y haz clic en el botón Conectar. Después, debería establecerse la conexión con tu entorno de ejecución local.

Para más información puedes leer la [documentación oficial](https://research.google.com/colaboratory/intl/es/local-runtimes.html).

In [None]:
# Comprobamos ruta del directorio de trabajo 
%pwd

'C:\\Users\\antonio.arredondo\\OneDrive - Bosonit\\Spark Training\\datasets'

In [None]:
file_path = "padron_madrid_2020.csv"

### Nube

Si prefieres trabajar en la **nube** aprovechando los recursos de los servidores de Google tendrás que instalar las librerías que no estén disponibles en la configuración por defecto del notebook e importar los ficheros que sean necesarios. 

Esta operación es necesaria hacerla siempre que te conectes a la nube o inicies un nuevo notebook.

#### Instalación librerías

In [None]:
# Install needed libraries
!pip install -q pyspark
!pip install -q koalas
!pip install -q pyarrow==0.15.1
!pip install -q pyngrok

[K     |████████████████████████████████| 204.8MB 67kB/s 
[K     |████████████████████████████████| 204kB 17.7MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
[K     |████████████████████████████████| 675kB 12.8MB/s 
[K     |████████████████████████████████| 59.2MB 69kB/s 
[K     |████████████████████████████████| 737kB 12.6MB/s 
[?25h  Building wheel for pyngrok (setup.py) ... [?25l[?25hdone


#### Importación de datos

##### Desde Google Drive

In [None]:
# Mount Drive (authentication required)
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Viewing the data in the folder present from the drive
!ls -l "/content/drive/My Drive/Colab Notebooks/datasets"

In [None]:
# Cloud file path
file_path = "drive/MyDrive/Colab\ Notebooks/datasets/padron_madrid_2020.csv"

##### Desde local

In [None]:
# Importing files method from colab for accessing Local file system
from google.colab import files

uploaded = files.upload()

In [None]:
uploaded.keys()

dict_keys(['padron_madrid_2020.csv'])

In [None]:
# Local file path
file_path = "padron_madrid_2020.csv"

# Local file path
file_path = "./OneDrive - Bosonit/Spark Training/datasets/padron_madrid_2020.csv"

## Análisis exploratorio usando PySpark

In [None]:
# Import PySpark and libraries
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
#from pyngrok import ngrok

In [None]:
# Build a SparkSession
spark = SparkSession.builder\
        .master("local[*]")\
        .appName("Padrón")\
        .config("spark.sql.execution.arrow.enabled", "false")\
        .config('spark.ui.port', '4050')\
        .enableHiveSupport()\
        .getOrCreate()

In [None]:
# Print the SparkSession
spark

In [None]:
# Open tunnel on the port 4050 to get a public URL
ngrok.connect(4050)



<NgrokTunnel: "http://1b6f8e3bd89b.ngrok.io" -> "http://localhost:4050">

In [None]:
# Define schema for our data using the StructType method
schema = StructType([
  # Define a StructField for each field
    StructField('cod_distrito', StringType(), False),
    StructField('desc_distrito', StringType(), False),
    StructField('cod_dist_barrio', StringType(), False),
    StructField('desc_barrio', StringType(), False),
    StructField('cod_barrio', StringType(), False),
    StructField('cod_dist_seccion', StringType(), False),
    StructField('cod_seccion', StringType(), False),
    StructField('cod_edad', IntegerType(), False),
    StructField('esp_hombres', IntegerType(), True),
    StructField('esp_mujeres', IntegerType(), True),
    StructField('ext_hombres', IntegerType(), True),
    StructField('ext_mujeres', IntegerType(), True)
])

In [None]:
# Load data into a dataframe
padron_raw = (spark.read.format('csv')
            .options(header=True, 
                     delimiter=';',
                     encoding='ISO-8859-1',
                     emptyValue=0,
                     ignoreLeadingWhiteSpace=True,
                     ignoreTrailingWhiteSpace=True
                    )
            .schema(schema)
            .load(file_path))

In [None]:
# Print the schema of the dataframe
padron_raw.printSchema()

# Count the number of rows 
print("There are {} rows in the DataFrame. \n".format(padron_raw.count()))

root
 |-- cod_distrito: string (nullable = true)
 |-- desc_distrito: string (nullable = true)
 |-- cod_dist_barrio: string (nullable = true)
 |-- desc_barrio: string (nullable = true)
 |-- cod_barrio: string (nullable = true)
 |-- cod_dist_seccion: string (nullable = true)
 |-- cod_seccion: string (nullable = true)
 |-- cod_edad: integer (nullable = true)
 |-- esp_hombres: integer (nullable = true)
 |-- esp_mujeres: integer (nullable = true)
 |-- ext_hombres: integer (nullable = true)
 |-- ext_mujeres: integer (nullable = true)

There are 237675 rows in the DataFrame. 



In [None]:
# Get count of null values in Pyspark
padron_raw.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) \
                   for c in padron_raw.columns[:8]]).show()

padron_raw.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) \
                   for c in padron_raw.columns[8:]]).show()

+------------+-------------+---------------+-----------+----------+----------------+-----------+--------+
|cod_distrito|desc_distrito|cod_dist_barrio|desc_barrio|cod_barrio|cod_dist_seccion|cod_seccion|cod_edad|
+------------+-------------+---------------+-----------+----------+----------------+-----------+--------+
|           0|            0|              0|          0|         0|               0|          0|       0|
+------------+-------------+---------------+-----------+----------+----------------+-----------+--------+

+-----------+-----------+-----------+-----------+
|esp_hombres|esp_mujeres|ext_hombres|ext_mujeres|
+-----------+-----------+-----------+-----------+
|          0|          0|          0|          0|
+-----------+-----------+-----------+-----------+



In [None]:
# Show the first 10 rows of the dataframe
padron_raw.show(10)

+------------+--------------------+---------------+--------------------+----------+----------------+-----------+--------+-----------+-----------+-----------+-----------+
|cod_distrito|       desc_distrito|cod_dist_barrio|         desc_barrio|cod_barrio|cod_dist_seccion|cod_seccion|cod_edad|esp_hombres|esp_mujeres|ext_hombres|ext_mujeres|
+------------+--------------------+---------------+--------------------+----------+----------------+-----------+--------+-----------+-----------+-----------+-----------+
|           1|CENTRO              |            101|PALACIO             |         1|            1006|          6|     103|          0|          1|          0|          0|
|           1|CENTRO              |            101|PALACIO             |         1|            1007|          7|       0|          1|          1|          0|          3|
|           1|CENTRO              |            101|PALACIO             |         1|            1007|          7|       1|          2|          3|     

In [None]:
padron_raw.filter(F.col("desc_distrito") == "CENTRO").show(3)

+------------+-------------+---------------+-----------+----------+----------------+-----------+--------+-----------+-----------+-----------+-----------+
|cod_distrito|desc_distrito|cod_dist_barrio|desc_barrio|cod_barrio|cod_dist_seccion|cod_seccion|cod_edad|esp_hombres|esp_mujeres|ext_hombres|ext_mujeres|
+------------+-------------+---------------+-----------+----------+----------------+-----------+--------+-----------+-----------+-----------+-----------+
+------------+-------------+---------------+-----------+----------+----------------+-----------+--------+-----------+-----------+-----------+-----------+



In [None]:
# Limpiamos DataFrame eliminando espacios y columnas innecesarias
padronDF = padron_raw.select(
  F.trim(F.col("desc_distrito")).alias("distrito"),
  F.trim(F.col("desc_barrio")).alias("barrio"),
  "cod_edad","esp_hombres", "esp_mujeres", "ext_hombres", "ext_mujeres")

# Alternative:
# padron_raw.select([(F.trim(col[0])).alias(col[0]) if col[1] == "string" else col[0] \
#                 for col in padron_raw.select("*").dtypes])

padronDF.show(10)

+--------+-------+--------+-----------+-----------+-----------+-----------+
|distrito| barrio|cod_edad|esp_hombres|esp_mujeres|ext_hombres|ext_mujeres|
+--------+-------+--------+-----------+-----------+-----------+-----------+
|  CENTRO|PALACIO|     103|          0|          1|          0|          0|
|  CENTRO|PALACIO|       0|          1|          1|          0|          3|
|  CENTRO|PALACIO|       1|          2|          3|          0|          0|
|  CENTRO|PALACIO|       2|          1|          4|          0|          0|
|  CENTRO|PALACIO|       3|          4|          0|          0|          0|
|  CENTRO|PALACIO|       4|          1|          2|          0|          1|
|  CENTRO|PALACIO|       5|          2|          6|          0|          0|
|  CENTRO|PALACIO|       6|          1|          0|          0|          0|
|  CENTRO|PALACIO|       7|          3|          2|          0|          0|
|  CENTRO|PALACIO|       8|          4|          2|          0|          0|
+--------+--

In [None]:
padronDF.filter(F.col("distrito") == "CENTRO").show(3)

+--------+-------+--------+-----------+-----------+-----------+-----------+
|distrito| barrio|cod_edad|esp_hombres|esp_mujeres|ext_hombres|ext_mujeres|
+--------+-------+--------+-----------+-----------+-----------+-----------+
|  CENTRO|PALACIO|     103|          0|          1|          0|          0|
|  CENTRO|PALACIO|       0|          1|          1|          0|          3|
|  CENTRO|PALACIO|       1|          2|          3|          0|          0|
+--------+-------+--------+-----------+-----------+-----------+-----------+
only showing top 3 rows



#### Número de barrios por distrito

In [None]:
padronDF.groupBy("distrito").agg(
    (F.countDistinct("barrio")).alias("total_barrios"))\
    .orderBy("total_barrios", ascending=False)\
    .show()

+-------------------+-------------+
|           distrito|total_barrios|
+-------------------+-------------+
|      CIUDAD LINEAL|            9|
|SAN BLAS-CANILLEJAS|            8|
|             LATINA|            8|
|FUENCARRAL-EL PARDO|            8|
|    MONCLOA-ARAVACA|            7|
|        CARABANCHEL|            7|
|             TETUAN|            7|
|              USERA|            7|
|         ARGANZUELA|            7|
|          SALAMANCA|            6|
|             CENTRO|            6|
|             RETIRO|            6|
|          HORTALEZA|            6|
| PUENTE DE VALLECAS|            6|
|           CHAMBERI|            6|
|          MORATALAZ|            6|
|          CHAMARTIN|            6|
|            BARAJAS|            5|
|         VILLAVERDE|            5|
|          VICALVARO|            4|
+-------------------+-------------+
only showing top 20 rows



### ¿Qué porcentaje de españoles y extranjeros hay por distrito y barrio?

In [None]:
padronDF = (padronDF
                .withColumn('total_esp', F.col('esp_hombres') + F.col('esp_mujeres'))
                .withColumn('total_ext', F.col('ext_hombres') + F.col('ext_mujeres'))
                .withColumn('total', F.col('total_esp') + F.col('total_ext'))
               )

padronDF.show(10)

+--------+-------+--------+-----------+-----------+-----------+-----------+---------+---------+-----+
|distrito| barrio|cod_edad|esp_hombres|esp_mujeres|ext_hombres|ext_mujeres|total_esp|total_ext|total|
+--------+-------+--------+-----------+-----------+-----------+-----------+---------+---------+-----+
|  CENTRO|PALACIO|     103|          0|          1|          0|          0|        1|        0|    1|
|  CENTRO|PALACIO|       0|          1|          1|          0|          3|        2|        3|    5|
|  CENTRO|PALACIO|       1|          2|          3|          0|          0|        5|        0|    5|
|  CENTRO|PALACIO|       2|          1|          4|          0|          0|        5|        0|    5|
|  CENTRO|PALACIO|       3|          4|          0|          0|          0|        4|        0|    4|
|  CENTRO|PALACIO|       4|          1|          2|          0|          1|        3|        1|    4|
|  CENTRO|PALACIO|       5|          2|          6|          0|          0|       

In [None]:
padronDF_grouped = padronDF.groupby('distrito','barrio')\
    .agg(
            (F.round(100 * F.sum('total_esp') / F.sum('total'), 2)).alias('pct_esp'),
            (F.round(100 * F.sum('total_ext') / F.sum('total'), 2)).alias('pct_ext')
    )\
    .orderBy('pct_ext', ascending=False)

padronDF_grouped.show(10)

+------------------+-------------+-------+-------+
|          distrito|       barrio|pct_esp|pct_ext|
+------------------+-------------+-------+-------+
|             USERA|   PRADOLONGO|  62.16|  37.84|
|        VILLAVERDE|SAN CRISTOBAL|  63.52|  36.48|
|PUENTE DE VALLECAS|    SAN DIEGO|  67.14|  32.86|
|            CENTRO|          SOL|  68.02|  31.98|
|             USERA|     MOSCARDO|  70.23|  29.77|
|            CENTRO|  EMBAJADORES|   70.5|   29.5|
|             USERA|  ALMENDRALES|  70.64|  29.36|
|             USERA|        ZOFIO|  72.95|  27.05|
|            CENTRO|       CORTES|  74.14|  25.86|
|       CARABANCHEL|PUERTA BONITA|  74.34|  25.66|
+------------------+-------------+-------+-------+
only showing top 10 rows



### ¿Cuál es la edad media de los ciudadanos por barrio?

In [None]:
(padronDF
 .select("barrio", "cod_edad")
 .groupBy("barrio")
 .agg((F.round(F.avg("cod_edad"),2)).alias("avg_edad"))
 .orderBy("avg_edad", ascending=False)
 .show(10)
)

+--------------+--------+
|        barrio|avg_edad|
+--------------+--------+
|    EL PLANTIO|   50.05|
|HISPANOAMERICA|    49.7|
|    GAZTAMBIDE|   49.52|
|       ALMAGRO|   49.49|
|    RIOS ROSAS|   49.45|
|  VALLEHERMOSO|   49.42|
| CASA DE CAMPO|   49.39|
|    CONCEPCION|   49.38|
|         IBIZA|    49.3|
|     ARGUELLES|    49.2|
+--------------+--------+
only showing top 10 rows



### Ejercicios

#### 6.10 Uso de groupBy
Lanza una consulta contra el DF resultante en la que muestre el número total de "espanoleshombres", "espanolesmujeres", extranjeroshombres" y "extranjerosmujeres" para cada barrio de cada distrito. 

Las columnas distrito y barrio deben ser las primeras en aparecer en el show. Los resultados deben estar ordenados en orden de más a menos según la columna "extranjerosmujeres" y desempatarán por la columna "extranjeroshombres".

In [None]:
padronDF.groupBy('distrito','barrio')\
    .agg(
         (F.sum("esp_hombres")).alias("esp_hombres"), 
         (F.sum("esp_mujeres")).alias("esp_mujeres"),
         (F.sum("ext_hombres")).alias("ext_hombres"),
         (F.sum("ext_mujeres")).alias("ext_mujeres")
     )\
    .orderBy("ext_mujeres", "ext_hombres", ascending=False)\
    .show()

+-------------------+--------------------+-----------+-----------+-----------+-----------+
|           distrito|              barrio|esp_hombres|esp_mujeres|ext_hombres|ext_mujeres|
+-------------------+--------------------+-----------+-----------+-----------+-----------+
| PUENTE DE VALLECAS|           SAN DIEGO|      13903|      15587|       7066|       7367|
|             LATINA|              ALUCHE|      25257|      29932|       5592|       6609|
|      CIUDAD LINEAL|        PUEBLO NUEVO|      23542|      27511|       5615|       6517|
|        CARABANCHEL|        VISTA ALEGRE|      15998|      19492|       5589|       6291|
| PUENTE DE VALLECAS|            NUMANCIA|      17290|      19641|       5584|       5907|
|             CENTRO|         EMBAJADORES|      16694|      16809|       8230|       5787|
|         VILLAVERDE|VILLAVERDE ALTO C.H.|      17102|      19110|       5182|       5470|
|             LATINA|    PUERTA DEL ANGEL|      15250|      17837|       4211|       4930|

#### 6.12 Uso de join

Crea un nuevo DataFrame a partir del original que muestre únicamente una columna con DESC_BARRIO, otra con DESC_DISTRITO y otra con el número total de "espanoleshombres" residentes en cada distrito de cada barrio. Únelo (con un join) con el DataFrame original a través de las columnas en común.

In [None]:
aggDF = padronDF.groupby("distrito", "barrio")\
  .agg(F.sum("esp_hombres").alias("total_esp_hombres"))

padronDF.join(aggDF, ["distrito","barrio"]).show(10)

#https://stackoverflow.com/questions/46944493/removing-duplicate-columns-after-a-df-join-in-spark

+--------+-------+--------+-----------+-----------+-----------+-----------+---------+---------+-----+-----------------+
|distrito| barrio|cod_edad|esp_hombres|esp_mujeres|ext_hombres|ext_mujeres|total_esp|total_ext|total|total_esp_hombres|
+--------+-------+--------+-----------+-----------+-----------+-----------+---------+---------+-----+-----------------+
|  CENTRO|PALACIO|     103|          0|          1|          0|          0|        1|        0|    1|             9380|
|  CENTRO|PALACIO|       0|          1|          1|          0|          3|        2|        3|    5|             9380|
|  CENTRO|PALACIO|       1|          2|          3|          0|          0|        5|        0|    5|             9380|
|  CENTRO|PALACIO|       2|          1|          4|          0|          0|        5|        0|    5|             9380|
|  CENTRO|PALACIO|       3|          4|          0|          0|          0|        4|        0|    4|             9380|
|  CENTRO|PALACIO|       4|          1| 

#### 6.13 Uso de funciones de ventana
Repite el ejercicio anterior utilizando funciones de ventana. (over(Window.partitionBy.....)).

Información interesante:
* https://medium.com/expedia-group-tech/deep-dive-into-apache-spark-window-functions-7b4e39ad3c86
* https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
* https://sparkbyexamples.com/spark/spark-sql-window-functions/

In [None]:
from pyspark.sql.window import Window

windowSpecAgg = Window.partitionBy("distrito", "barrio")

padronDF.withColumn("total_esp_hombres",
                    F.sum(F.col("esp_hombres")).over(windowSpecAgg))\
                    .filter(F.col("barrio") == "PALACIO")\
                    .show(10)

+--------+-------+--------+-----------+-----------+-----------+-----------+---------+---------+-----+-----------------+
|distrito| barrio|cod_edad|esp_hombres|esp_mujeres|ext_hombres|ext_mujeres|total_esp|total_ext|total|total_esp_hombres|
+--------+-------+--------+-----------+-----------+-----------+-----------+---------+---------+-----+-----------------+
|  CENTRO|PALACIO|     103|          0|          1|          0|          0|        1|        0|    1|             9380|
|  CENTRO|PALACIO|       0|          1|          1|          0|          3|        2|        3|    5|             9380|
|  CENTRO|PALACIO|       1|          2|          3|          0|          0|        5|        0|    5|             9380|
|  CENTRO|PALACIO|       2|          1|          4|          0|          0|        5|        0|    5|             9380|
|  CENTRO|PALACIO|       3|          4|          0|          0|          0|        4|        0|    4|             9380|
|  CENTRO|PALACIO|       4|          1| 

#### 6.14 Uso de tablas Pivot
Mediante una función Pivot muestra una tabla (que va a ser una tabla de contingencia) que contenga los valores medios de `espanolesmujeres` para cada barrio y en cada rango de edad (COD_EDAD_INT). Los barrios incluidos deben ser únicamente CENTRO, BARAJAS y RETIRO y deben figurar como columnas.

In [None]:
# Sin usar la función pivot
padronDF.filter("""distrito in ("CENTRO", "BARAJAS", "RETIRO")""")\
  .groupBy("cod_edad", "distrito")\
  .mean("esp_mujeres")\
  .orderBy("cod_edad")\
  .show(10)

+--------+--------+------------------+
|cod_edad|distrito|  avg(esp_mujeres)|
+--------+--------+------------------+
|       0|  CENTRO|2.3545454545454545|
|       0|  RETIRO|3.4193548387096775|
|       0| BARAJAS| 5.483870967741935|
|       1| BARAJAS| 5.774193548387097|
|       1|  RETIRO|3.9361702127659575|
|       1|  CENTRO|2.3423423423423424|
|       2| BARAJAS| 6.741935483870968|
|       2|  CENTRO|2.3394495412844036|
|       2|  RETIRO| 4.258064516129032|
|       3|  CENTRO|2.2181818181818183|
+--------+--------+------------------+
only showing top 10 rows



In [None]:
# Usando la función pivot
padronDF.groupBy("cod_edad")\
  .pivot("distrito", ["BARAJAS", "CENTRO", "RETIRO"])\
  .agg(F.round(F.mean("esp_mujeres"),2))\
  .orderBy("cod_edad")\
  .show(10)

+--------+-------+------+------+
|cod_edad|BARAJAS|CENTRO|RETIRO|
+--------+-------+------+------+
|       0|   5.48|  2.35|  3.42|
|       1|   5.77|  2.34|  3.94|
|       2|   6.74|  2.34|  4.26|
|       3|   7.58|  2.22|  4.53|
|       4|   8.06|  2.24|  4.64|
|       5|   8.23|  2.27|  4.59|
|       6|   7.84|  2.28|  4.77|
|       7|   8.71|  2.13|  4.59|
|       8|   8.13|  2.39|  4.66|
|       9|   8.58|  2.41|  4.54|
+--------+-------+------+------+
only showing top 10 rows



#### 6.15 Uso de operaciones vectoriales
Utilizando este nuevo DF, crea 3 columnas nuevas que hagan referencia a qué porcentaje de la población total de "espanolesmujeres" de cada rango de edad representa cada uno de los tres distritos. 

Debe estar redondeada a 2 decimales. Puedes imponerte la condición extra de no apoyarte en ninguna columna auxiliar creada para el caso.

In [None]:
padronDF_edad = padronDF.groupBy("cod_edad")\
  .pivot("distrito", ["BARAJAS", "CENTRO", "RETIRO"])\
  .sum("esp_mujeres")\
  .orderBy("cod_edad")

padronDF_edad.show(5)

+--------+-------+------+------+
|cod_edad|BARAJAS|CENTRO|RETIRO|
+--------+-------+------+------+
|       0|    170|   259|   318|
|       1|    179|   260|   370|
|       2|    209|   255|   396|
|       3|    235|   244|   426|
|       4|    250|   244|   436|
+--------+-------+------+------+
only showing top 5 rows



In [None]:
padronDF_edad.select("*",
                     (F.round(100 * (F.col("BARAJAS") / (F.col("BARAJAS") + F.col("CENTRO") + F.col("RETIRO"))),2)).alias("% BARAJAS"),
                     (F.round(100 * (F.col("CENTRO") / (F.col("BARAJAS") + F.col("CENTRO") + F.col("RETIRO"))),2)).alias("% CENTRO"),
                     (F.round(100 * (F.col("RETIRO") / (F.col("BARAJAS") + F.col("CENTRO") + F.col("RETIRO"))),2)).alias("% RETIRO")).show(10)

+--------+-------+------+------+---------+--------+--------+
|cod_edad|BARAJAS|CENTRO|RETIRO|% BARAJAS|% CENTRO|% RETIRO|
+--------+-------+------+------+---------+--------+--------+
|       0|    170|   259|   318|    22.76|   34.67|   42.57|
|       1|    179|   260|   370|    22.13|   32.14|   45.74|
|       2|    209|   255|   396|     24.3|   29.65|   46.05|
|       3|    235|   244|   426|    25.97|   26.96|   47.07|
|       4|    250|   244|   436|    26.88|   26.24|   46.88|
|       5|    255|   250|   431|    27.24|   26.71|   46.05|
|       6|    243|   251|   448|     25.8|   26.65|   47.56|
|       7|    270|   232|   431|    28.94|   24.87|    46.2|
|       8|    252|   263|   438|    26.44|    27.6|   45.96|
|       9|    266|   265|   422|    27.91|   27.81|   44.28|
+--------+-------+------+------+---------+--------+--------+
only showing top 10 rows



#### 6.16 Uso de particionado

Guarda el archivo CSV particionado por distrito y por barrio (en ese orden) en un directorio local. Consulta el directorio para ver la estructura de los ficheros y comprueba que es la esperada.

In [None]:
%fs
ls /FileStore/datasets

/content


In [None]:
padronDF.write.partitionBy("distrito","barrio").mode("overwrite").csv("/FileStore/datasets/padron.csv", header=True)

In [None]:
%fs 
ls /FileStore/datasets/padron.csv

In [None]:
spark.read.csv("/FileStore/datasets/padron.csv/distrito=ARGANZUELA/barrio=ACACIAS", header=True).show(5)

## Análisis usando SQL

In [None]:
# Create a local temporal table from DataFrame
padronDF.createOrReplaceTempView('padron_ltv')

# Print the tables in the catalog
print(spark.catalog.listTables())

[Table(name='padron_ltv', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


In [None]:
query = """
SELECT
  distrito AS DISTRITO,
  barrio AS BARRIO,
  ROUND(100 * SUM(total_esp)/SUM(total), 2) AS PCT_ESP,
  ROUND(100 * SUM(total_ext)/SUM(total), 2) AS PCT_EXT
  FROM padron_ltv
  GROUP BY distrito, barrio
  ORDER BY pct_ext DESC
  LIMIT 10
"""
spark.sql(query).show()

+------------------+-------------+-------+-------+
|          DISTRITO|       BARRIO|PCT_ESP|PCT_EXT|
+------------------+-------------+-------+-------+
|             USERA|   PRADOLONGO|  62.16|  37.84|
|        VILLAVERDE|SAN CRISTOBAL|  63.52|  36.48|
|PUENTE DE VALLECAS|    SAN DIEGO|  67.14|  32.86|
|            CENTRO|          SOL|  68.02|  31.98|
|             USERA|     MOSCARDO|  70.23|  29.77|
|            CENTRO|  EMBAJADORES|   70.5|   29.5|
|             USERA|  ALMENDRALES|  70.64|  29.36|
|             USERA|        ZOFIO|  72.95|  27.05|
|            CENTRO|       CORTES|  74.14|  25.86|
|       CARABANCHEL|PUERTA BONITA|  74.34|  25.66|
+------------------+-------------+-------+-------+



### Hive

In [None]:
spark.sql("CREATE DATABASE IF NOT EXISTS datos_padron")
spark.sql("USE datos_padron")
spark.sql("SHOW DATABASES").show()

+------------+
|   namespace|
+------------+
|datos_padron|
|     default|
+------------+



In [None]:
spark.sql("SHOW TABLES").show()

+------------+----------+-----------+
|    database| tableName|isTemporary|
+------------+----------+-----------+
|datos_padron|padron_raw|      false|
+------------+----------+-----------+



In [None]:
spark.sql("DROP TABLE IF EXISTS padron_raw;")

query = """
CREATE TABLE datos_padron.padron_raw
(
	cod_distrito STRING,
	desc_distrito STRING,
	cod_dist_barrio STRING,
	desc_barrio STRING,
	cod_barrio STRING,
	cod_dist_seccion STRING,
	cod_seccion STRING,
	cod_edad INT,
	espanoles_hombres INT,
	espanoles_mujeres INT,
	extranjeros_hombres INT,
	extranjeros_mujeres INT
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' 
WITH SERDEPROPERTIES
(
    "separatorChar" = ";",
    "quoteChar" = '\"'
)
TBLPROPERTIES (
    "skip.header.line.count"="1",
    "seriealization.null.format"=""
);
"""
spark.sql(query)

spark.sql("LOAD DATA LOCAL INPATH '/content/padron_madrid_2020.csv' INTO TABLE padron_raw")

DataFrame[]

In [None]:
spark.sql("SELECT * FROM padron_raw LIMIT 5").show()

+------------+--------------------+---------------+--------------------+----------+----------------+-----------+------------+-----------------+-----------------+-------------------+-------------------+
|cod_distrito|       desc_distrito|cod_dist_barrio|         desc_barrio|cod_barrio|cod_dist_seccion|cod_seccion|    cod_edad|espanoles_hombres|espanoles_mujeres|extranjeros_hombres|extranjeros_mujeres|
+------------+--------------------+---------------+--------------------+----------+----------------+-----------+------------+-----------------+-----------------+-------------------+-------------------+
|COD_DISTRITO|       DESC_DISTRITO|COD_DIST_BARRIO|         DESC_BARRIO|COD_BARRIO|COD_DIST_SECCION|COD_SECCION|COD_EDAD_INT| EspanolesHombres| EspanolesMujeres| ExtranjerosHombres| ExtranjerosMujeres|
|           1|CENTRO              |            101|PALACIO             |         1|            1006|          6|         103|                 |                1|                   |           

In [None]:
# Check the number of rows
spark.sql("SELECT COUNT(*) AS total_rows FROM padron_raw").show()

+----------+
|total_rows|
+----------+
|    237676|
+----------+



In [None]:
spark.sql("DROP TABLE IF EXISTS padron_clean")

query = """
CREATE TABLE datos_padron.padron_clean
AS 
SELECT
  cod_distrito,
  TRIM(desc_distrito) AS distrito,
  cod_dist_barrio,
  TRIM(desc_barrio) AS barrio,
  cod_barrio,
  cod_dist_seccion,
  cod_seccion,
  CAST(cod_edad AS INT),
  CAST(espanoles_hombres AS INT),
  CAST(espanoles_mujeres AS INT),
  CAST(extranjeros_hombres AS INT),
  CAST(extranjeros_mujeres AS INT)
FROM datos_padron.padron_raw
WHERE cod_distrito != "COD_DISTRITO"
"""
spark.sql(query)

DataFrame[]

In [None]:
spark.sql("SELECT * FROM padron_clean LIMIT 5").show()

+------------+--------+---------------+-------+----------+----------------+-----------+--------+-----------------+-----------------+-------------------+-------------------+
|cod_distrito|distrito|cod_dist_barrio| barrio|cod_barrio|cod_dist_seccion|cod_seccion|cod_edad|espanoles_hombres|espanoles_mujeres|extranjeros_hombres|extranjeros_mujeres|
+------------+--------+---------------+-------+----------+----------------+-----------+--------+-----------------+-----------------+-------------------+-------------------+
|           1|  CENTRO|            101|PALACIO|         1|            1006|          6|     103|             null|                1|               null|               null|
|           1|  CENTRO|            101|PALACIO|         1|            1007|          7|       0|                1|                1|               null|                  3|
|           1|  CENTRO|            101|PALACIO|         1|            1007|          7|       1|                2|                3|   

In [None]:
# Check the number of rows
spark.sql("SELECT COUNT(*) AS total_rows FROM padron_clean").show()

+----------+
|total_rows|
+----------+
|    237675|
+----------+



In [None]:
spark.sql("SHOW TABLES").show()

+------------+------------+-----------+
|    database|   tableName|isTemporary|
+------------+------------+-----------+
|datos_padron|padron_clean|      false|
|datos_padron|  padron_raw|      false|
+------------+------------+-----------+



In [None]:
query = """
WITH total AS (
SELECT
    distrito,
    barrio,
    nvl(espanoles_hombres,0) + nvl(espanoles_mujeres,0) AS total_espanoles,
    nvl(extranjeros_hombres,0) + nvl(extranjeros_mujeres,0) AS total_extranjeros
FROM padron_clean
)

SELECT
  distrito AS DISTRITO,
  barrio AS BARRIO,
  ROUND(100 * SUM(total_espanoles)/SUM(total_espanoles + total_extranjeros), 2) AS PCT_ESP,
  ROUND(100 * SUM(total_extranjeros)/SUM(total_espanoles + total_extranjeros), 2) AS PCT_EXT
  FROM total
  GROUP BY distrito, barrio
  ORDER BY pct_ext DESC
  LIMIT 10
"""
spark.sql(query).show()

+------------------+-------------+-------+-------+
|          DISTRITO|       BARRIO|PCT_ESP|PCT_EXT|
+------------------+-------------+-------+-------+
|             USERA|   PRADOLONGO|  62.16|  37.84|
|        VILLAVERDE|SAN CRISTOBAL|  63.52|  36.48|
|PUENTE DE VALLECAS|    SAN DIEGO|  67.14|  32.86|
|            CENTRO|          SOL|  68.02|  31.98|
|             USERA|     MOSCARDO|  70.23|  29.77|
|            CENTRO|  EMBAJADORES|   70.5|   29.5|
|             USERA|  ALMENDRALES|  70.64|  29.36|
|             USERA|        ZOFIO|  72.95|  27.05|
|            CENTRO|       CORTES|  74.14|  25.86|
|       CARABANCHEL|PUERTA BONITA|  74.34|  25.66|
+------------------+-------------+-------+-------+



## Referencias

* https://www.datasciencemadesimple.com/pyspark-tutorial/
* https://sparkbyexamples.com/pyspark/
* https://mungingdata.com/pyspark/