#### Práctica Padrón Madrid

##### Hive

In [0]:
%sql

CREATE DATABASE datos_padron;

In [0]:
%sql

CREATE TABLE padron_txt ( 
    cod_distrito INT, desc_distrito STRING,
    cod_dist_barrio INT, desc_barrio STRING, cod_barrio INT,
    cod_dist_seccion INT, cod_seccion INT, cod_edad_int INT,
    EspanolesHombres INT, EspanolesMujeres INT, 
    ExtranjerosHombres INT, ExtranjerosMujeres INT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    "separatorChar" = '\073',
    "quoteChar" = '"',
    "escapeChar" = "\\"
    )
STORED AS TEXTFILE
TBLPROPERTIES (
    'skip.header.line.count' = '1');

In [0]:
%sql

LOAD DATA LOCAL INPATH '/home/cloudera/ejercicios/Rango_Edades_Seccion_202204.csv'
INTO TABLE padron_txt;

In [0]:
%sql

CREATE TABLE padron_txt_2 
AS SELECT trim(cod_distrito), trim(desc_distrito),
    trim(cod_dist_barrio), trim(desc_barrio), trim(cod_barrio),
    trim(cod_dist_seccion), trim(cod_seccion), trim(cod_edad_int),
    trim(EspanolesHombres), trim(EspanolesMujeres), 
    trim(ExtranjerosHombres), trim(ExtranjerosMujeres)
FROM padron_txt;

LOAD DATA INPATH command id used to load data into hive table. "LOCAL" means that the input file is on the local
file system. If "LOCAL" is omitted then it looks for the file in HDFS.

In [0]:
%sql

Insert overwrite table padron_txt
select cod_distrito, desc_distrito,
    cod_dist_barrio, desc_barrio, cod_barrio,
    cod_dist_seccion, cod_seccion, cod_edad,
    EspanolesMujeres, 
    extranjeroshombres, extranjerosmujeres, 
case when length(espanoleshombres) = 0 then 0 else espanoleshombres end as espanoleshombres
from padron_txt;

In [0]:
%sql

--- Expresiones regulares (regex)

CREATE TABLE padron_txt_2 ( 
    cod_distrito INT, desc_distrito STRING,
    cod_dist_barrio INT, desc_barrio STRING, cod_barrio INT,
    cod_dist_seccion INT, cod_seccion INT, cod_edad_int INT,
    EspanolesHombres INT, EspanolesMujeres INT, 
    ExtranjerosHombres INT, ExtranjerosMujeres INT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe' 
WITH SERDEPROPERTIES (
"input.regex"='"(.*)"\073"([A-Za-z]*) *"\073"(.*)"\073"([A-Za-z]*) *"\073"(.*)"\073"(.*?)"\073"(.*?)"\073"(.*?)"\073"(.*?)"\073"(.*?)"\073"(.*?)"\073"(.*?)"') 
STORED AS TEXTFILE 
TBLPROPERTIES ("skip.header.line.count"="1");

##### Parquet

CTAS = create table as select statement (in Hive, tables can also be created and populated by the results of a query)

In [0]:
%sql

create table padron_parquet stored as parquet as select * from padron_txt;

create table padron_parquet_2 stored as parquet as select * from padron_txt_2;

Formato PARQUET
No es un fichero en texto plano (se representa de forma binaria), lo que significa que no lo podemos abrir y examinar con un simple editor de texto. 
Es un formato orientado a columnas (csv y avro son orientados a filas).
Cada columna se almacena de forma independiente y cada columna es accesible de forma independiente al resto. 
Es un formato autodescriptivo, que integra el esquema o la estructura dentro de los datos en sí. De esta forma, cualquier programa que 
se utilice para leer los datos puede acceder a estos metadatos. 
--> Genera menores costes de almacenamiento para archivos de datos y maximiza la efectividad de las consultas de datos

Comparar tamaño ficheros de la base de datos padrón alojados en hdfs:
hadoop fs -ls /user/hive/warehouse/datos_padron.db

* -rwxrwxrwx   1 cloudera supergroup     934657 2022-05-13 09:54 /user/hive/warehouse/datos_padron.db/padron_parquet/000000_0
* -rwxrwxrwx   1 cloudera supergroup     932354 2022-05-13 09:56 /user/hive/warehouse/datos_padron.db/padron_parquet_2/000000_0
* -rwxrwxrwx   1 cloudera supergroup   22689999 2022-05-12 10:27 /user/hive/warehouse/datos_padron.db/padron_txt/000000_0
* -rwxrwxrwx   1 cloudera supergroup   12179631 2022-05-11 17:27 /user/hive/warehouse/datos_padron.db/padron_txt_2/000000_0

##### Impala

###### IMPALA 
Es un motor SQL pensado para operar sobre grandes volúmenes de datos. Corre sobre clusters hadoop.
Ejecuta las consultas sobre el cluster en lugar de ejecutar MapReduce para procesar, es más rápido que Hive o Pig. 

Hive e Impala --> ambas herramientas usan variantes de SQL, comparten el mismo metastore e data warehouse en el cluster, en grandes proyectos se usan juntas. 

Diferencias --> Hive soporta más funcionalidades y tiene tolerancia a fallos (procesamiento basado en MapReduce, sin un nodo 
falla otro se hace cargo). Latencia alta. Mejor para trabajos pesados, si no buscamos velocidad y queremos tolerancia a fallos. 
Imapala no tiene tolerancia a fallos (si un nodo falla, toda la ejecución falla, hay que volver a lanzar la consulta), no es
compatible con MapReduce, ejecuta las consultas sobre el cluster. Latencia baja. Mejora para consultas ligeras, si buscas 
velocidad.

INVALIDATE METADATA marks the metadata for one or all tables as stale. 
The next time the Impala service performs a query against a table whose metadata is invalidated, Impala reloads the associated metadata before the query proceeds. 
As this is a very expensive operation compared to the incremental metadata update done by the REFRESH statement, when possible, prefer REFRESH rather than INVALIDATE METADATA.

INVALIDATE METADATA is required when the following changes are made outside of Impala, in Hive and other Hive client, such as SparkSQL:
Metadata of existing tables changes.
New tables are added, and Impala will use the tables.
The SERVER or DATABASE level Ranger privileges are changed.
Block metadata changes, but the files remain the same (HDFS rebalance).
UDF jars change.
Some tables are no longer queried, and you want to remove their metadata from the catalog and coordinator caches to reduce memory requirements.

Once issued, the INVALIDATE METADATA statement cannot be cancelled.

INVALIDATE METADATA [[db_name.]table_name]

In [0]:
%sql

INVALIDATE METADATA padron_txt;
INVALIDATE METADATA padron_txt_2;
INVALIDATE METADATA padron_parquet;
INVALIDATE METADATA padron_parquet_2;


In [0]:
%sql

CREATE TABLE padron_txt_3 STORED AS TEXTFILE AS SELECT
cod_distrito, desc_distrito,cod_dist_barrio, desc_barrio, cod_barrio,
cod_dist_seccion, cod_seccion, cod_edad_int, NVL(espanoleshombres, 0) AS espanoleshombres,
NVL(espanolesmujeres, 0) AS espanolesmujeres,
NVL(extranjeroshombres, 0) AS extranjeroshombres,
NVL(extranjerosmujeres, 0) AS extranjerosmujeres
FROM padron_TXT_2;

In [0]:
%sql

create table padron_parquet_3 stored as parquet as select * from padron_txt_3;

In [0]:
%sql

-- padron_parquet_3

select desc_barrio, desc_distrito, sum(espanoleshombres) as total_espanoleshombres, sum(espanolesmujeres) as total_espanolesmujeres, 
sum(extranjeroshombres) as total_extranjeroshombres, sum(extranjerosmujeres) as total_extranjerosmujeres
from padron_parquet_3
group by desc_barrio, desc_distrito;

-- padron_txt_3

select desc_barrio, desc_distrito, sum(espanoleshombres) as total_espanoleshombres, sum(espanolesmujeres) as total_espanolesmujeres, 
sum(extranjeroshombres) as total_extranjeroshombres, sum(extranjerosmujeres) as total_extranjerosmujeres
from padron_txt_3
group by desc_barrio, desc_distrito;

desc_barrio,desc_distrito,total_espanoleshombres,total_espanolesmujeres,total_extranjeroshombres,total_extranjerosmujeres


In [0]:
%sql

-- En Imapala

INVALIDATE METADATA padron_parquet_3
INVALIDATE METADATA padron_txt_3 

select desc_barrio, desc_distrito, sum(espanoleshombres) as total_espanoleshombres, sum(espanolesmujeres) as total_espanolesmujeres,  
sum(extranjeroshombres) as total_extranjeroshombres, sum(extranjerossmujeres) as total_extranjerosmujeres 
from padron_parquet_3 
group by desc_barrio, desc_distrito;

select desc_barrio, desc_distrito, sum(espanoleshombres) as total_espanoleshombres, sum(espanolesmujeres) as total_espanolesmujeres, 
sum(extranjeroshombres) as total_extranjeroshombres, sum(extranjerossmujeres) as total_extranjerosmujeres
from padron_txt_3
group by desc_barrio, desc_distrito;

-- La consulta en Impala se ejecuta mucho más rápida. No he visto mucha diferencia entre las consultas con la tabla parquet y la tabla txt. 

##### Tablas particionadas

In [0]:
%sql

-- En Imapala

INVALIDATE METADATA padron_parquet_3
INVALIDATE METADATA padron_txt_3 

select desc_barrio, desc_distrito, sum(espanoleshombres) as total_espanoleshombres, sum(espanolesmujeres) as total_espanolesmujeres,  
sum(extranjeroshombres) as total_extranjeroshombres, sum(extranjerossmujeres) as total_extranjerosmujeres 
from padron_parquet_3 
group by desc_barrio, desc_distrito;

select desc_barrio, desc_distrito, sum(espanoleshombres) as total_espanoleshombres, sum(espanolesmujeres) as total_espanolesmujeres, 
sum(extranjeroshombres) as total_extranjeroshombres, sum(extranjerossmujeres) as total_extranjerosmujeres
from padron_txt_3
group by desc_barrio, desc_distrito;

-- La consulta en Impala se ejecuta mucho más rápida. No he visto mucha diferencia entre las consultas con la tabla parquet y la tabla txt. 

In [0]:
%sql

-- Calcular el total de EspanolesHombres, EspanolesMujeres, ExtranjerosHombres y ExtranjerosMujeres agrupado por DESC_DISTRITO y DESC_BARRIO para los distritos CENTRO, LATINA, CHAMARTIN, TETUAN, VICALVARO y BARAJAS.

select desc_distrito, desc_barrio, sum(espanoleshombres) AS total_espanoleshombres,
sum(espanolesmujeres) AS total_espanolesmujeres, 
sum(extranjeroshombres) AS total_extranjeroshombres,
sum(extranjerosmujeres) AS total_extranjeromujeres
FROM padron_particionado
GROUP BY desc_distrito, desc_barrio
HAVING desc_distrito = 'CENTRO' or desc_distrito = 'LATINA' or desc_distrito = 'CHAMARTIN'
or desc_distrito = 'TETUAN' or desc_distrito = 'VICALVARO' or desc_distrito = 'BARAJAS';

-- Consulta en Hive sobre la tabla padron_particionado 41.13s 
-- Consulta en Hive sobre la tabla padron_parquet_3 34.36s

-- Consulta en Impala sobre la tabla padron_particionado 2.54s
-- Consulta en Impala sobre la tabla padron_parquet_3 5.3s


desc_distrito,desc_barrio,total_espanoleshombres,total_espanolesmujeres,total_extranjeroshombres,total_extranjeromujeres


In [0]:
%sql

-- Hive, tabla padron_parquet_3 24.72s
-- Hive, tabla padron_particionado 26.44s
-- Hive, tabla padron_txt_3 24.39s
-- Impala, tabla padron_parquet_3 0s

select desc_distrito, desc_barrio, max(espanoleshombres) AS max_espanoleshombres,
max(espanolesmujeres) AS max_espanolesmujeres, 
max(extranjeroshombres) AS max_extranjeroshombres,
max(extranjerosmujeres) AS max_extranjeromujeres
FROM padron_parquet_3
GROUP BY desc_distrito, desc_barrio
HAVING desc_distrito = 'CENTRO' or desc_distrito = 'LATINA' or desc_distrito = 'CHAMARTIN'
or desc_distrito = 'TETUAN' or desc_distrito = 'VICALVARO' or desc_distrito = 'BARAJAS';

-- Hive, tabla padron_txt_3 24.85s
-- Hive, tabla padron_parquet_3 24.64s
-- Hive, tabla padron_particionado 25.44s
-- Impala, tabla padron_txt_3 4.79s

select desc_distrito, desc_barrio, min(espanoleshombres) AS min_espanoleshombres,
min(espanolesmujeres) AS min_espanolesmujeres, 
min(extranjeroshombres) AS min_extranjeroshombres,
min(extranjerosmujeres) AS min_extranjeromujeres
FROM padron_txt_3
GROUP BY desc_distrito, desc_barrio
HAVING desc_distrito = 'CENTRO' or desc_distrito = 'LATINA' or desc_distrito = 'CHAMARTIN'
or desc_distrito = 'TETUAN' or desc_distrito = 'VICALVARO' or desc_distrito = 'BARAJAS';

-- Hive, tabla padron_particionado 34.4s
-- Hive, tabla padron_parquet_3 31.97s
-- Hive, tabla padron_txt_3 34.55s
-- Impala, tabla padron_particionado 0s

select desc_distrito, desc_barrio, avg(espanoleshombres) AS avg_espanoleshombres,
avg(espanolesmujeres) AS avg_espanolesmujeres, 
avg(extranjeroshombres) AS avg_extranjeroshombres,
avg(extranjerosmujeres) AS avg_extranjeromujeres
FROM padron_particionado
GROUP BY desc_distrito, desc_barrio
HAVING desc_distrito = 'CENTRO' or desc_distrito = 'LATINA' or desc_distrito = 'CHAMARTIN'
or desc_distrito = 'TETUAN' or desc_distrito = 'VICALVARO' or desc_distrito = 'BARAJAS';

-- Hive, tabla padron_particionado 41.79s
-- Hive, tabla padron_parquet_3 36.41s
-- Hive, tabla padron_txt_3 36.46s
-- Impala, tabla padron_particionado 0s
-- Impala, tabla padron_parquet_3 0s
-- Impala, tabla padron_txt_3 0s

select desc_distrito, desc_barrio, count(espanoleshombres) AS count_espanoleshombres,
count(espanolesmujeres) AS count_espanolesmujeres, 
count(extranjeroshombres) AS count_extranjeroshombres,
count(extranjerosmujeres) AS count_extranjeromujeres
FROM padron_particionado
GROUP BY desc_distrito, desc_barrio
HAVING desc_distrito = 'CENTRO' or desc_distrito = 'LATINA' or desc_distrito = 'CHAMARTIN'
or desc_distrito = 'TETUAN' or desc_distrito = 'VICALVARO' or desc_distrito = 'BARAJAS';


desc_distrito,desc_barrio,count_espanoleshombres,count_espanolesmujeres,count_extranjeroshombres,count_extranjeromujeres


##### Tablas en HDFS

Tablas en HDFS

Tablas externas: no gestionadas
Tablas internas: gestionadas

En local de la MV crea dos ficheros: datos1.txt y datos2.txt

Crear un directorio en HDFS:

* [cloudera@quickstart ~]$ hdfs dfs -mkdir /test

Mueve el fichero datos1.txt al directorio creado en HDFS con un comando desde la consola:

* [cloudera@quickstart ejercicios]$ hadoop fs -put /home/cloudera/Desktop/ejercicios/datos1.txt /test

In [0]:
%sql

-- En HIVE:

CREATE DATABASE numeros; 

CREATE TABLE numeros_tbl (
col_1 INT, col_2 INT, col_3 INT)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ',';

-- Carga los datos del fichero datos1.txt almacenado en HDFS

LOAD DATA INPATH '/test/datos1.txt' INTO TABLE numeros_tbl;

-- En la carpeta hdfs dfs -ls /test ya no está el fichero datos1.txt
-- Ahora los datos se encuentran aquí: [cloudera@quickstart ejercicios]$ hdfs dfs -ls /user/hive/warehouse/numeros.db/numeros_tbl

DROP TABLE numeros_tbl;

-- Ahora ya no existe la carpeta numeros_tbl con los datos: [cloudera@quickstart ejercicios]$ hdfs dfs -ls /user/hive/warehouse/numeros.db
-- La carpeta donde estaba almacenado el fichero datos1.txt sigue vacía [cloudera@quickstart ejercicios]$ hdfs dfs -ls /test

-- Vuelvo a mover el fichero datos1.txt desde local al directorio HDFS: [cloudera@quickstart ejercicios]$ hadoop fs -put /home/cloudera/Desktop/ejercicios/datos1.txt /test

In [0]:
%sql

-- En Hive:

CREATE EXTERNAL TABLE numeros_tbl_2 (
col_1 INT, col_2 INT, col_3 INT)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ',';

LOAD DATA INPATH '/test/datos1.txt' INTO TABLE numeros_tbl_2;

-- En la carpeta hdfs dfs -ls /test ya no está el fichero datos1.txt
-- [cloudera@quickstart ~]$ hdfs dfs -ls /user/hive/warehouse/numeros.db
-- Found 1 items drwxrwxrwx   - cloudera supergroup          0 2022-05-17 09:19 /user/hive/warehouse/numeros.db/numeros_tbl_2

DROP TABLE numeros_tbl_2;

-- La carpeta sigue en el warehouse de hive: [cloudera@quickstart ~]$ hdfs dfs -ls /user/hive/warehouse/numeros.db/numeros_tbl_2
-- La carpeta hdfs dfs -ls /test está vacía, ya no está el fichero datos1.txt

-- Vuelvo a subir datos1 en el directorio test: [cloudera@quickstart ~]$ hadoop fs -put /home/cloudera/Desktop/ejercicios/datos1.txt /test

CREATE EXTERNAL TABLE numeros_tbl_3 (
col_1 INT, col_2 INT, col_3 INT)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ','
LOCATION '/test';

SELECT * FROM numeros_tbl_3;
-- En la tabla aparecen todos los registros de datos1, no he usado el comando load data inpath. 

-- Subo a la carpeta /test también el fichero datos2: hadoop fs -put /home/cloudera/Desktop/ejercicios/datos2.txt /test

SELECT * FROM numeros_tbl_3;
-- Ahora en la tabla aparecen también todos los registros del fichero datos2.


-- Los datos de una tabla externa no se borran en HDFS al borrar la tabla. Una tabla externa necesita especificar la ubicación de la tabla. 

Hive internal table:

* It is the default table in Hive. 
* Hive owns the data for the internal tables.
* By default, an internal table will be created in a folder path similar to /user/hive/warehouse directory of HDFS. We can override the default location by the location property during table creation.
* If we drop the managed table or partition, the table data and the metadata associated with that table will be deleted from the HDFS.

Hive external table:

* Hive does not manage the data of the External table.
* External tables are stored outside the warehouse directory. They can access data stored in sources such as remote HDFS locations or Azure Storage Volumes.
* Whenever we drop the external table, then only the metadata associated with the table will get deleted, the table data remains untouched by Hive.
* We can create the external table by specifying the EXTERNAL keyword in the Hive create table statement.

When you create external table and don't specify location, the data will be stored in the hive default location: /hive/warehouse/<database_name>.db/<table_name>

##### Spark

In [0]:
from pyspark.sql import SparkSession 

padron_file = "/FileStore/tables/Rango_Edades_Seccion_202204.csv"

spark = (SparkSession
 .builder
 .appName("PadronMadrid")
 .getOrCreate())

df = (spark.read.format("csv")
 .option("inferSchema", "true")
 .option("header", "true")
 .option("quote", '"')
 .option("escape", "\\")
 .option("sep", "\073" )
 .load(padron_file))

In [0]:
print(df.printSchema())

root
 |-- COD_DISTRITO: integer (nullable = true)
 |-- DESC_DISTRITO: string (nullable = true)
 |-- COD_DIST_BARRIO: integer (nullable = true)
 |-- DESC_BARRIO: string (nullable = true)
 |-- COD_BARRIO: integer (nullable = true)
 |-- COD_DIST_SECCION: integer (nullable = true)
 |-- COD_SECCION: integer (nullable = true)
 |-- COD_EDAD_INT: integer (nullable = true)
 |-- EspanolesHombres: integer (nullable = true)
 |-- EspanolesMujeres: integer (nullable = true)
 |-- ExtranjerosHombres: integer (nullable = true)
 |-- ExtranjerosMujeres: integer (nullable = true)

None


In [0]:
df.show(10)

+------------+--------------------+---------------+--------------------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+
|COD_DISTRITO|       DESC_DISTRITO|COD_DIST_BARRIO|         DESC_BARRIO|COD_BARRIO|COD_DIST_SECCION|COD_SECCION|COD_EDAD_INT|EspanolesHombres|EspanolesMujeres|ExtranjerosHombres|ExtranjerosMujeres|
+------------+--------------------+---------------+--------------------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+
|           1|CENTRO              |            101|PALACIO             |         1|            1001|          1|           0|               3|               1|                 1|              null|
|           1|CENTRO              |            101|PALACIO             |         1|            1001|          1|           1|               5|               2|              null|              null|
|         

In [0]:
padron_df = df.na.fill(0)

In [0]:
padron_df.show(10)

+------------+--------------------+---------------+--------------------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+
|COD_DISTRITO|       DESC_DISTRITO|COD_DIST_BARRIO|         DESC_BARRIO|COD_BARRIO|COD_DIST_SECCION|COD_SECCION|COD_EDAD_INT|EspanolesHombres|EspanolesMujeres|ExtranjerosHombres|ExtranjerosMujeres|
+------------+--------------------+---------------+--------------------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+
|           1|CENTRO              |            101|PALACIO             |         1|            1001|          1|           0|               3|               1|                 1|                 0|
|           1|CENTRO              |            101|PALACIO             |         1|            1001|          1|           1|               5|               2|                 0|                 0|
|         

In [0]:
from pyspark.sql.functions import trim

padron_df = padron_df.withColumn("DESC_DISTRITO", trim(padron_df.DESC_DISTRITO))
padron_df = padron_df.withColumn("DESC_BARRIO", trim(padron_df.DESC_BARRIO))

In [0]:
padron_df.show(10)

+------------+-------------+---------------+-----------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+
|COD_DISTRITO|DESC_DISTRITO|COD_DIST_BARRIO|DESC_BARRIO|COD_BARRIO|COD_DIST_SECCION|COD_SECCION|COD_EDAD_INT|EspanolesHombres|EspanolesMujeres|ExtranjerosHombres|ExtranjerosMujeres|
+------------+-------------+---------------+-----------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+
|           1|       CENTRO|            101|    PALACIO|         1|            1001|          1|           0|               3|               1|                 1|                 0|
|           1|       CENTRO|            101|    PALACIO|         1|            1001|          1|           1|               5|               2|                 0|                 0|
|           1|       CENTRO|            101|    PALACIO|         1|            1001|      

In [0]:
padron_df.select("DESC_BARRIO").distinct().show()
padron_df.select("DESC_BARRIO").distinct().count()

+----------------+
|     DESC_BARRIO|
+----------------+
|        VALVERDE|
|          CORTES|
|       TRAFALGAR|
|FUENTE DEL BERRO|
|        PACIFICO|
|  CUATRO CAMINOS|
|        EL PILAR|
|       ARGUELLES|
|   VALDEACEDERAS|
|    VALLEHERMOSO|
|           IBIZA|
|     EMBAJADORES|
|      CASTELLANA|
|  HISPANOAMERICA|
|         ACACIAS|
|    LOS CARMENES|
|         CHOPERA|
|         PALACIO|
|        JUSTICIA|
|     UNIVERSIDAD|
+----------------+
only showing top 20 rows

Out[123]: 132

In [0]:
# Crea una vista temporal

padron_df.createOrReplaceTempView("padron")

In [0]:
spark.sql("""SELECT COUNT (DISTINCT DESC_BARRIO) AS numero_barrios
              FROM padron""").show()

+--------------+
|numero_barrios|
+--------------+
|           132|
+--------------+



In [0]:
# Crea una nueva columna que muestre la longitud de los campos de la columna DESC_DISTRITO

from pyspark.sql.functions import length

padron_df = padron_df.withColumn("longitud", length(padron_df.DESC_DISTRITO))

In [0]:
padron_df.show(10)

+------------+-------------+---------------+-----------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+--------+
|COD_DISTRITO|DESC_DISTRITO|COD_DIST_BARRIO|DESC_BARRIO|COD_BARRIO|COD_DIST_SECCION|COD_SECCION|COD_EDAD_INT|EspanolesHombres|EspanolesMujeres|ExtranjerosHombres|ExtranjerosMujeres|longitud|
+------------+-------------+---------------+-----------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+--------+
|           1|       CENTRO|            101|    PALACIO|         1|            1001|          1|           0|               3|               1|                 1|                 0|       6|
|           1|       CENTRO|            101|    PALACIO|         1|            1001|          1|           1|               5|               2|                 0|                 0|       6|
|           1|       CENTRO|            101| 

In [0]:
padron_df.printSchema()

root
 |-- COD_DISTRITO: integer (nullable = true)
 |-- DESC_DISTRITO: string (nullable = true)
 |-- COD_DIST_BARRIO: integer (nullable = true)
 |-- DESC_BARRIO: string (nullable = true)
 |-- COD_BARRIO: integer (nullable = true)
 |-- COD_DIST_SECCION: integer (nullable = true)
 |-- COD_SECCION: integer (nullable = true)
 |-- COD_EDAD_INT: integer (nullable = true)
 |-- EspanolesHombres: integer (nullable = true)
 |-- EspanolesMujeres: integer (nullable = true)
 |-- ExtranjerosHombres: integer (nullable = true)
 |-- ExtranjerosMujeres: integer (nullable = true)
 |-- longitud: integer (nullable = true)



In [0]:
# Crea una nueva columna que muestre el valor 5 para cada registro de la tabla

from pyspark.sql.functions import lit

padron_df_5 = padron_df.withColumn("nueva_columna", lit(5))
padron_df_5.show(10)

+------------+-------------+---------------+-----------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+--------+-------------+
|COD_DISTRITO|DESC_DISTRITO|COD_DIST_BARRIO|DESC_BARRIO|COD_BARRIO|COD_DIST_SECCION|COD_SECCION|COD_EDAD_INT|EspanolesHombres|EspanolesMujeres|ExtranjerosHombres|ExtranjerosMujeres|longitud|nueva_columna|
+------------+-------------+---------------+-----------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+--------+-------------+
|           1|       CENTRO|            101|    PALACIO|         1|            1001|          1|           0|               3|               1|                 1|                 0|       6|            5|
|           1|       CENTRO|            101|    PALACIO|         1|            1001|          1|           1|               5|               2|                 0|                 0

In [0]:
padron_df_5.printSchema()

root
 |-- COD_DISTRITO: integer (nullable = true)
 |-- DESC_DISTRITO: string (nullable = true)
 |-- COD_DIST_BARRIO: integer (nullable = true)
 |-- DESC_BARRIO: string (nullable = true)
 |-- COD_BARRIO: integer (nullable = true)
 |-- COD_DIST_SECCION: integer (nullable = true)
 |-- COD_SECCION: integer (nullable = true)
 |-- COD_EDAD_INT: integer (nullable = true)
 |-- EspanolesHombres: integer (nullable = true)
 |-- EspanolesMujeres: integer (nullable = true)
 |-- ExtranjerosHombres: integer (nullable = true)
 |-- ExtranjerosMujeres: integer (nullable = true)
 |-- longitud: integer (nullable = true)
 |-- nueva_columna: integer (nullable = false)



In [0]:
# Borra la nueva columna

padron_df_5 = padron_df_5.drop("nueva_columna")

In [0]:
padron_df_5.printSchema()

root
 |-- COD_DISTRITO: integer (nullable = true)
 |-- DESC_DISTRITO: string (nullable = true)
 |-- COD_DIST_BARRIO: integer (nullable = true)
 |-- DESC_BARRIO: string (nullable = true)
 |-- COD_BARRIO: integer (nullable = true)
 |-- COD_DIST_SECCION: integer (nullable = true)
 |-- COD_SECCION: integer (nullable = true)
 |-- COD_EDAD_INT: integer (nullable = true)
 |-- EspanolesHombres: integer (nullable = true)
 |-- EspanolesMujeres: integer (nullable = true)
 |-- ExtranjerosHombres: integer (nullable = true)
 |-- ExtranjerosMujeres: integer (nullable = true)
 |-- longitud: integer (nullable = true)



In [0]:
padron_df.write.option("header",True) \
        .option("inferSchema", "true") \
        .partitionBy("DESC_DISTRITO","DESC_BARRIO") \
        .mode("overwrite") \
        .csv("/tmp/output/padron_partition_2")

In [0]:
padron_partition_2 = spark.read.option("header",True)\
                 .option("inferSchema", "true")\
                 .csv("/tmp/output/padron_partition_2")

In [0]:
padron_partition_2.printSchema()

root
 |-- COD_DISTRITO: integer (nullable = true)
 |-- COD_DIST_BARRIO: integer (nullable = true)
 |-- COD_BARRIO: integer (nullable = true)
 |-- COD_DIST_SECCION: integer (nullable = true)
 |-- COD_SECCION: integer (nullable = true)
 |-- COD_EDAD_INT: integer (nullable = true)
 |-- EspanolesHombres: integer (nullable = true)
 |-- EspanolesMujeres: integer (nullable = true)
 |-- ExtranjerosHombres: integer (nullable = true)
 |-- ExtranjerosMujeres: integer (nullable = true)
 |-- longitud: integer (nullable = true)
 |-- DESC_DISTRITO: string (nullable = true)
 |-- DESC_BARRIO: string (nullable = true)



In [0]:
padron_cache = padron_partition_2.cache()

In [0]:
# 6.10

padron_cache.groupBy("DESC_DISTRITO","DESC_BARRIO")\
    .agg(
      sum("EspanolesHombres").alias("total_EspanolesHombres"),
      sum("EspanolesMujeres").alias("total_EspanolesMujeres"),
      sum("ExtranjerosHombres").alias("total_ExtranjerosHombres"),
      sum("ExtranjerosMujeres").alias("total_ExtranjerosMujeres"))\
    .sort(desc("total_ExtranjerosMujeres"), desc("total_ExtranjerosHombres"))\
    .show(10)


#padron_cache.groupBy("DESC_DISTRITO","DESC_BARRIO").sum("EspanolesHombres", "EspanolesMujeres", "ExtranjerosHombres", "ExtranjerosMujeres").show(10)

+------------------+--------------------+----------------------+----------------------+------------------------+------------------------+
|     DESC_DISTRITO|         DESC_BARRIO|total_EspanolesHombres|total_EspanolesMujeres|total_ExtranjerosHombres|total_ExtranjerosMujeres|
+------------------+--------------------+----------------------+----------------------+------------------------+------------------------+
|PUENTE DE VALLECAS|           SAN DIEGO|                 13908|                 15598|                    7145|                    7274|
|     CIUDAD LINEAL|        PUEBLO NUEVO|                 23080|                 27172|                    5750|                    6571|
|            LATINA|              ALUCHE|                 24808|                 29563|                    5586|                    6546|
|       CARABANCHEL|        VISTA ALEGRE|                 15796|                 19268|                    5715|                    6321|
|PUENTE DE VALLECAS|            NU

In [0]:
# Elimina el registro en caché

padron_partition_2.unpersist()

Out[138]: DataFrame[COD_DISTRITO: int, COD_DIST_BARRIO: int, COD_BARRIO: int, COD_DIST_SECCION: int, COD_SECCION: int, COD_EDAD_INT: int, EspanolesHombres: int, EspanolesMujeres: int, ExtranjerosHombres: int, ExtranjerosMujeres: int, longitud: int, DESC_DISTRITO: string, DESC_BARRIO: string]

In [0]:
padron_cache.groupBy("DESC_DISTRITO","DESC_BARRIO")\
    .agg(sum("EspanolesHombres").alias("total_EspanolesHombres"))\
    .join(padron_df, ["DESC_BARRIO", "DESC_DISTRITO"], "left")\
    .sort("DESC_DISTRITO")\
    .show(10)


+-----------+-------------+----------------------+------------+---------------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+--------+
|DESC_BARRIO|DESC_DISTRITO|total_EspanolesHombres|COD_DISTRITO|COD_DIST_BARRIO|COD_BARRIO|COD_DIST_SECCION|COD_SECCION|COD_EDAD_INT|EspanolesHombres|EspanolesMujeres|ExtranjerosHombres|ExtranjerosMujeres|longitud|
+-----------+-------------+----------------------+------------+---------------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+--------+
|     ATOCHA|   ARGANZUELA|                   754|           2|            207|         7|            2083|         83|           0|               3|               5|                 1|                 1|      10|
|     ATOCHA|   ARGANZUELA|                   754|           2|            207|         7|            2083|         83|           1|            

In [0]:
# 6.13)

from pyspark.sql.window import Window

padron_cache.select("DESC_DISTRITO", "DESC_BARRIO", sum("EspanolesHombres").over(Window.partitionBy("DESC_DISTRITO", "DESC_BARRIO")).alias("total_EspanolesHombres"))\
.join(padron_df, ["DESC_BARRIO", "DESC_DISTRITO"], "left")\
.sort("DESC_DISTRITO")\
.show(10)


+-----------+-------------+----------------------+------------+---------------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+--------+
|DESC_BARRIO|DESC_DISTRITO|total_EspanolesHombres|COD_DISTRITO|COD_DIST_BARRIO|COD_BARRIO|COD_DIST_SECCION|COD_SECCION|COD_EDAD_INT|EspanolesHombres|EspanolesMujeres|ExtranjerosHombres|ExtranjerosMujeres|longitud|
+-----------+-------------+----------------------+------------+---------------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+--------+
|     ATOCHA|   ARGANZUELA|                   754|           2|            207|         7|            2083|         83|           0|               3|               5|                 1|                 1|      10|
|     ATOCHA|   ARGANZUELA|                   754|           2|            207|         7|            2083|         83|           1|            

In [0]:
pivotDF = padron_cache.groupBy("COD_EDAD_INT")\
                      .pivot("DESC_DISTRITO", ["BARAJAS", "CENTRO", "RETIRO"])\
                      .sum("EspanolesMujeres").alias("mujeres")\
                      .sort("COD_EDAD_INT")


In [0]:
pivotDF.show(10)

+------------+-------+------+------+
|COD_EDAD_INT|BARAJAS|CENTRO|RETIRO|
+------------+-------+------+------+
|           0|    146|   262|   317|
|           1|    160|   240|   318|
|           2|    171|   200|   377|
|           3|    175|   216|   365|
|           4|    201|   239|   412|
|           5|    217|   224|   420|
|           6|    242|   231|   405|
|           7|    229|   237|   466|
|           8|    234|   228|   417|
|           9|    236|   235|   421|
+------------+-------+------+------+
only showing top 10 rows



In [0]:
# 6.15

from pyspark.sql.functions import round

pivotDF.withColumn("%_BARAJAS", round(col("BARAJAS")/(col("BARAJAS")+col("CENTRO")+col("RETIRO"))*100, 2))\
       .withColumn("%_CENTRO", round(col("CENTRO")/(col("BARAJAS")+col("CENTRO")+col("RETIRO"))*100, 2))\
       .withColumn("%_RETIRO", round(col("RETIRO")/(col("BARAJAS")+col("CENTRO")+col("RETIRO"))*100, 2))\
       .show(10)

+------------+-------+------+------+---------+--------+--------+
|COD_EDAD_INT|BARAJAS|CENTRO|RETIRO|%_BARAJAS|%_CENTRO|%_RETIRO|
+------------+-------+------+------+---------+--------+--------+
|           0|    146|   262|   317|    20.14|   36.14|   43.72|
|           1|    160|   240|   318|    22.28|   33.43|   44.29|
|           2|    171|   200|   377|    22.86|   26.74|    50.4|
|           3|    175|   216|   365|    23.15|   28.57|   48.28|
|           4|    201|   239|   412|    23.59|   28.05|   48.36|
|           5|    217|   224|   420|     25.2|   26.02|   48.78|
|           6|    242|   231|   405|    27.56|   26.31|   46.13|
|           7|    229|   237|   466|    24.57|   25.43|    50.0|
|           8|    234|   228|   417|    26.62|   25.94|   47.44|
|           9|    236|   235|   421|    26.46|   26.35|    47.2|
+------------+-------+------+------+---------+--------+--------+
only showing top 10 rows



In [0]:
padron_df.write.partitionBy("DESC_DISTRITO","DESC_BARRIO").format("parquet").save("padron.parquet")

In [0]:
padron_df.write.partitionBy("DESC_DISTRITO","DESC_BARRIO").format("csv").save("padron.csv")

In [0]:
dbutils.fs.ls("dbfs:/padron.csv/DESC_DISTRITO=CENTRO/DESC_BARRIO=CORTES/")

Out[245]: [FileInfo(path='dbfs:/padron.csv/DESC_DISTRITO=CENTRO/DESC_BARRIO=CORTES/_SUCCESS', name='_SUCCESS', size=0, modificationTime=1652971630000),
 FileInfo(path='dbfs:/padron.csv/DESC_DISTRITO=CENTRO/DESC_BARRIO=CORTES/_committed_3456009251416974041', name='_committed_3456009251416974041', size=115, modificationTime=1652971630000),
 FileInfo(path='dbfs:/padron.csv/DESC_DISTRITO=CENTRO/DESC_BARRIO=CORTES/_started_3456009251416974041', name='_started_3456009251416974041', size=0, modificationTime=1652971606000),
 FileInfo(path='dbfs:/padron.csv/DESC_DISTRITO=CENTRO/DESC_BARRIO=CORTES/part-00000-tid-3456009251416974041-e854fcfa-d3ec-4ef3-bdbe-2c319a6eb30b-2422-12.c000.csv', name='part-00000-tid-3456009251416974041-e854fcfa-d3ec-4ef3-bdbe-2c319a6eb30b-2422-12.c000.csv', size=30653, modificationTime=1652971606000)]

In [0]:
%fs

ls dbfs:/padron.parquet/


path,name,size,modificationTime
dbfs:/padron.parquet/DESC_DISTRITO=ARGANZUELA/,DESC_DISTRITO=ARGANZUELA/,0,0
dbfs:/padron.parquet/DESC_DISTRITO=BARAJAS/,DESC_DISTRITO=BARAJAS/,0,0
dbfs:/padron.parquet/DESC_DISTRITO=CARABANCHEL/,DESC_DISTRITO=CARABANCHEL/,0,0
dbfs:/padron.parquet/DESC_DISTRITO=CENTRO/,DESC_DISTRITO=CENTRO/,0,0
dbfs:/padron.parquet/DESC_DISTRITO=CHAMARTIN/,DESC_DISTRITO=CHAMARTIN/,0,0
dbfs:/padron.parquet/DESC_DISTRITO=CHAMBERI/,DESC_DISTRITO=CHAMBERI/,0,0
dbfs:/padron.parquet/DESC_DISTRITO=CIUDAD LINEAL/,DESC_DISTRITO=CIUDAD LINEAL/,0,0
dbfs:/padron.parquet/DESC_DISTRITO=FUENCARRAL-EL PARDO/,DESC_DISTRITO=FUENCARRAL-EL PARDO/,0,0
dbfs:/padron.parquet/DESC_DISTRITO=HORTALEZA/,DESC_DISTRITO=HORTALEZA/,0,0
dbfs:/padron.parquet/DESC_DISTRITO=LATINA/,DESC_DISTRITO=LATINA/,0,0


In [0]:
%fs

ls dbfs:/padron.csv

path,name,size,modificationTime
dbfs:/padron.csv/DESC_DISTRITO=ARGANZUELA/,DESC_DISTRITO=ARGANZUELA/,0,0
dbfs:/padron.csv/DESC_DISTRITO=BARAJAS/,DESC_DISTRITO=BARAJAS/,0,0
dbfs:/padron.csv/DESC_DISTRITO=CARABANCHEL/,DESC_DISTRITO=CARABANCHEL/,0,0
dbfs:/padron.csv/DESC_DISTRITO=CENTRO/,DESC_DISTRITO=CENTRO/,0,0
dbfs:/padron.csv/DESC_DISTRITO=CHAMARTIN/,DESC_DISTRITO=CHAMARTIN/,0,0
dbfs:/padron.csv/DESC_DISTRITO=CHAMBERI/,DESC_DISTRITO=CHAMBERI/,0,0
dbfs:/padron.csv/DESC_DISTRITO=CIUDAD LINEAL/,DESC_DISTRITO=CIUDAD LINEAL/,0,0
dbfs:/padron.csv/DESC_DISTRITO=FUENCARRAL-EL PARDO/,DESC_DISTRITO=FUENCARRAL-EL PARDO/,0,0
dbfs:/padron.csv/DESC_DISTRITO=HORTALEZA/,DESC_DISTRITO=HORTALEZA/,0,0
dbfs:/padron.csv/DESC_DISTRITO=LATINA/,DESC_DISTRITO=LATINA/,0,0
