## Un poquito de Spark.

Esta parte se realiza con Azure databricks y pyspark para probar dicho entorno. El propio databricks te crea una localización en forma de tabla tras subir el archivo CSV.

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window

# File location and type
file_location = "/FileStore/tables/Rango_Edades_Seccion_202112.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ";"

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location) \
  .na.fill(value=0) \
  .withColumn("DESC_DISTRITO",F.trim(F.col("DESC_DISTRITO"))) \
  .withColumn("DESC_BARRIO",F.trim(F.col("DESC_BARRIO")))

#df.show(n=5)
display(df.limit(5))

COD_DISTRITO,DESC_DISTRITO,COD_DIST_BARRIO,DESC_BARRIO,COD_BARRIO,COD_DIST_SECCION,COD_SECCION,COD_EDAD_INT,EspanolesHombres,EspanolesMujeres,ExtranjerosHombres,ExtranjerosMujeres
1,CENTRO,101,PALACIO,1,1001,1,0,2,3,1,0
1,CENTRO,101,PALACIO,1,1001,1,1,7,0,1,0
1,CENTRO,101,PALACIO,1,1001,1,2,2,3,0,5
1,CENTRO,101,PALACIO,1,1001,1,3,3,1,0,0
1,CENTRO,101,PALACIO,1,1001,1,4,2,0,1,3


In [0]:
#Crea una vista temporal de nombre "padron" y a través de ella cuenta el número de barrios diferentes que hay.

temp_table_name = "Rango_Edades_Seccion_202112_csv"

df.createOrReplaceTempView(temp_table_name)

In [0]:
#Enumera todos los barrios diferentes.

df1 = df.select(df.DESC_DISTRITO).distinct()

#df1.show(n=5)
display(df1.limit(5))

DESC_DISTRITO
SALAMANCA
RETIRO
CENTRO
CHAMARTIN
ARGANZUELA


In [0]:
%sql

/* Enumera todos los barrios diferentes. */

select distinct(DESC_DISTRITO) from `Rango_Edades_Seccion_202112_csv`

DESC_DISTRITO
LATINA
TETUAN
SALAMANCA
RETIRO
MONCLOA-ARAVACA
HORTALEZA
PUENTE DE VALLECAS
VILLAVERDE
CHAMBERI
CIUDAD LINEAL


In [0]:
#Crea una nueva columna que muestre la longitud de los campos de la columna DESC_DISTRITO y que se llame "longitud".

df2 = df.withColumn("longitud", F.length(df.DESC_DISTRITO))

#df2.show(n=5)
display(df2.limit(5))

COD_DISTRITO,DESC_DISTRITO,COD_DIST_BARRIO,DESC_BARRIO,COD_BARRIO,COD_DIST_SECCION,COD_SECCION,COD_EDAD_INT,EspanolesHombres,EspanolesMujeres,ExtranjerosHombres,ExtranjerosMujeres,longitud
1,CENTRO,101,PALACIO,1,1001,1,0,2,3,1,0,6
1,CENTRO,101,PALACIO,1,1001,1,1,7,0,1,0,6
1,CENTRO,101,PALACIO,1,1001,1,2,2,3,0,5,6
1,CENTRO,101,PALACIO,1,1001,1,3,3,1,0,0,6
1,CENTRO,101,PALACIO,1,1001,1,4,2,0,1,3,6


In [0]:
#Crea una nueva columna que muestre el valor 5 para cada uno de los registros de la tabla.

df3 = df2.withColumn("valor_5", F.lit(5))

#df3.show(n=5)
display(df3.limit(5))

COD_DISTRITO,DESC_DISTRITO,COD_DIST_BARRIO,DESC_BARRIO,COD_BARRIO,COD_DIST_SECCION,COD_SECCION,COD_EDAD_INT,EspanolesHombres,EspanolesMujeres,ExtranjerosHombres,ExtranjerosMujeres,longitud,valor_5
1,CENTRO,101,PALACIO,1,1001,1,0,2,3,1,0,6,5
1,CENTRO,101,PALACIO,1,1001,1,1,7,0,1,0,6,5
1,CENTRO,101,PALACIO,1,1001,1,2,2,3,0,5,6,5
1,CENTRO,101,PALACIO,1,1001,1,3,3,1,0,0,6,5
1,CENTRO,101,PALACIO,1,1001,1,4,2,0,1,3,6,5


In [0]:
#Borra esta columna.

df4 = df3.drop('valor_5')

#df4.show(n=5)
display(df4.limit(5))

COD_DISTRITO,DESC_DISTRITO,COD_DIST_BARRIO,DESC_BARRIO,COD_BARRIO,COD_DIST_SECCION,COD_SECCION,COD_EDAD_INT,EspanolesHombres,EspanolesMujeres,ExtranjerosHombres,ExtranjerosMujeres,longitud
1,CENTRO,101,PALACIO,1,1001,1,0,2,3,1,0,6
1,CENTRO,101,PALACIO,1,1001,1,1,7,0,1,0,6
1,CENTRO,101,PALACIO,1,1001,1,2,2,3,0,5,6
1,CENTRO,101,PALACIO,1,1001,1,3,3,1,0,0,6
1,CENTRO,101,PALACIO,1,1001,1,4,2,0,1,3,6


In [0]:
#Particiona el DataFrame por las variables DESC_DISTRITO y DESC_BARRIO.
#Almacénalo en caché. Consulta en el puerto 4040 (UI de Spark) de tu usuario local el estado de los rdds almacenados.

df5 = df4.repartition(F.col("DESC_BARRIO"), F.col("DESC_DISTRITO")).cache()

#df5.show(n=5)
#window  = Window.partitionBy( F.col("DESC_DISTRITO"), F.col("DESC_BARRIO"))
display(df5.limit(5))

COD_DISTRITO,DESC_DISTRITO,COD_DIST_BARRIO,DESC_BARRIO,COD_BARRIO,COD_DIST_SECCION,COD_SECCION,COD_EDAD_INT,EspanolesHombres,EspanolesMujeres,ExtranjerosHombres,ExtranjerosMujeres,longitud
2,ARGANZUELA,202,ACACIAS,2,2085,85,26,2,1,0,1,10
2,ARGANZUELA,202,ACACIAS,2,2085,85,27,5,5,0,1,10
2,ARGANZUELA,202,ACACIAS,2,2085,85,28,6,7,2,2,10
2,ARGANZUELA,202,ACACIAS,2,2085,85,29,7,10,1,1,10
2,ARGANZUELA,202,ACACIAS,2,2085,85,30,10,7,0,1,10


In [0]:
#Lanza una consulta contra el DF resultante en la que muestre el número total de "espanoleshombres", "espanolesmujeres", extranjeroshombres" y "extranjerosmujeres" para cada barrio de cada distrito. Las columnas distrito y barrio deben ser las primeras en aparecer en el show. Los resultados deben estar ordenados en orden de más a menos según la columna "extranjerosmujeres" y desempatarán por la columna "extranjeroshombres".

df6 = df5.groupBy("DESC_BARRIO", "DESC_DISTRITO").agg(F.sum(F.col('espanoleshombres').cast('int')).alias('espanoleshombres'), F.sum(F.col('espanolesmujeres').cast('int')).alias('espanolesmujeres'),  F.sum(F.col('extranjeroshombres').cast('int')).alias('extranjeroshombres'), F.sum(F.col('extranjerosmujeres').cast('int')).alias('extranjerosmujeres') ).orderBy("extranjerosmujeres", "extranjeroshombres").cache()

display(df6.limit(5))

DESC_BARRIO,DESC_DISTRITO,espanoleshombres,espanolesmujeres,extranjeroshombres,extranjerosmujeres
ATOCHA,ARGANZUELA,738,750,60,73
EL PARDO,FUENCARRAL-EL PARDO,1626,1623,70,93
ATALAYA,CIUDAD LINEAL,607,809,83,105
FUENTELARREINA,FUENCARRAL-EL PARDO,1445,1679,72,171
EL PLANTIO,MONCLOA-ARAVACA,1252,1431,135,203


In [0]:
#Elimina el registro en caché.

df6.unpersist()

In [0]:
#Crea un nuevo DataFrame a partir del original que muestre únicamente una columna con DESC_BARRIO, otra con DESC_DISTRITO y otra con el número total de "espanoleshombres" residentes en cada distrito de cada barrio. Únelo (con un join) con el DataFrame original a través de las columnas en común.

df7 = df.groupBy("DESC_BARRIO", "DESC_DISTRITO").agg(F.sum(F.col('espanoleshombres').cast('int')).alias('espanoleshombres')).cache()

display(df7.limit(5))

df_join = df7.join( df6  , (df6.DESC_BARRIO  == df7.DESC_BARRIO) & (df6.DESC_DISTRITO  == df7.DESC_DISTRITO) ) 


display(df_join.limit(5))


DESC_BARRIO,DESC_DISTRITO,espanoleshombres
ACACIAS,ARGANZUELA,15399
VALVERDE,FUENCARRAL-EL PARDO,26922
FUENTELARREINA,FUENCARRAL-EL PARDO,1445
PAVONES,MORATALAZ,3584
EL GOLOSO,FUENCARRAL-EL PARDO,8947


DESC_BARRIO,DESC_DISTRITO,espanoleshombres,DESC_BARRIO.1,DESC_DISTRITO.1,espanoleshombres.1,espanolesmujeres,extranjeroshombres,extranjerosmujeres
ACACIAS,ARGANZUELA,15399,ACACIAS,ARGANZUELA,15399,18073,1355,1507
VALVERDE,FUENCARRAL-EL PARDO,26922,VALVERDE,FUENCARRAL-EL PARDO,26922,29105,3675,4441
FUENTELARREINA,FUENCARRAL-EL PARDO,1445,FUENTELARREINA,FUENCARRAL-EL PARDO,1445,1679,72,171
PAVONES,MORATALAZ,3584,PAVONES,MORATALAZ,3584,4321,419,478
EL GOLOSO,FUENCARRAL-EL PARDO,8947,EL GOLOSO,FUENCARRAL-EL PARDO,8947,9076,551,630


In [0]:
#Repite la función anterior utilizando funciones de ventana. (over(Window.partitionBy.....)).

window = Window.partitionBy( F.col("DESC_BARRIO"), F.col("DESC_DISTRITO"))

df_window = df4.withColumn( "espanoleshombres", F.sum(F.col('espanoleshombres').cast('int')).over(window))

#jdbcDF_window = df4.select(F.col("DESC_BARRIO"), F.col("DESC_DISTRITO"), F.col("espanoleshombres"), F.col("espanolesmujeres") , F.col("extranjeroshombres"), F.col("extranjerosmujeres") , F.col("extranjerosmujeres")).agg(F.sum(F.col('espanoleshombres').cast('int')).alias('espanoleshombres'), F.sum(F.col('espanolesmujeres').cast('int')).alias('espanolesmujeres'), F.sum(F.col('extranjeroshombres').cast('int')).alias('extranjeroshombres'), F.sum(F.col('extranjerosmujeres').cast('int')).alias('extranjerosmujeres') ).withColumn( "rn", row_number().over(window)).cache()


display(df_window.limit(5))


COD_DISTRITO,DESC_DISTRITO,COD_DIST_BARRIO,DESC_BARRIO,COD_BARRIO,COD_DIST_SECCION,COD_SECCION,COD_EDAD_INT,espanoleshombres,EspanolesMujeres,ExtranjerosHombres,ExtranjerosMujeres,longitud
2,ARGANZUELA,202,ACACIAS,2,2085,85,26,15399,1,0,1,10
2,ARGANZUELA,202,ACACIAS,2,2085,85,27,15399,5,0,1,10
2,ARGANZUELA,202,ACACIAS,2,2085,85,28,15399,7,2,2,10
2,ARGANZUELA,202,ACACIAS,2,2085,85,29,15399,10,1,1,10
2,ARGANZUELA,202,ACACIAS,2,2085,85,30,15399,7,0,1,10


In [0]:
%sql

/*Mediante una función Pivot muestra una tabla (que va a ser una tabla de contingencia) que contenga los valores totales ()la suma de valores) de espanolesmujeres para cada distrito y en cada rango de edad (COD_EDAD_INT). Los distritos incluidos deben ser únicamente CENTRO, BARAJAS y RETIRO y deben figurar como columnas . El aspecto debe ser similar a este: */


select COD_EDAD_INT,DESC_DISTRITO, SUM(espanolesmujeres) as espanolesmujeres 
from `Rango_Edades_Seccion_202112_csv`
WHERE DESC_DISTRITO IN ('CENTRO', 'BARAJAS' , 'RETIRO')
group by COD_EDAD_INT, DESC_DISTRITO
order by COD_EDAD_INT
limit 10;




COD_EDAD_INT,DESC_DISTRITO,espanolesmujeres
0,BARAJAS,146
0,CENTRO,240
0,RETIRO,294
1,RETIRO,346
1,BARAJAS,199
1,CENTRO,243
2,RETIRO,343
2,BARAJAS,180
2,CENTRO,223
3,CENTRO,229


In [0]:
pivotDF = df.where( F.col("DESC_DISTRITO").isin('CENTRO', 'BARAJAS' , 'RETIRO')).groupBy("COD_EDAD_INT").pivot("DESC_DISTRITO").sum("espanolesmujeres").orderBy("COD_EDAD_INT")

display(pivotDF.limit(10))

COD_EDAD_INT,BARAJAS,CENTRO,RETIRO
0,146,240,294
1,199,243,346
2,180,223,343
3,204,229,383
4,231,228,419
5,243,231,423
6,257,257,427
7,244,241,435
8,268,223,428
9,245,254,430


In [0]:
#Utilizando este nuevo DF, crea 3 columnas nuevas que hagan referencia a qué porcentaje de la suma de "espanolesmujeres" en los tres distritos para cada rango de edad representa cada uno de los tres distritos. Debe estar redondeada a 2 decimales. Puedes imponerte la condición extra de no apoyarte en ninguna columna auxiliar creada para el caso.
df8 = df.where( F.col("DESC_DISTRITO").isin('CENTRO', 'BARAJAS' , 'RETIRO')).groupBy("COD_EDAD_INT").pivot("DESC_DISTRITO").sum("espanolesmujeres").orderBy("COD_EDAD_INT") \
  .withColumn("PBARAJAS", F.col("BARAJAS") / df.where(F.col("DESC_DISTRITO") == 'BARAJAS').groupBy("DESC_DISTRITO").sum("espanolesmujeres"))



In [0]:
#Guarda el archivo csv original particionado por distrito y por barrio (en ese orden) en un directorio local. Consulta el directorio para ver la estructura de los ficheros y comprueba que es la esperada.

df.write.option("header", True) \
        .partitionBy("DESC_BARRIO", "DESC_DISTRITO") \
        .mode("overwrite") \
        .saveAsTable("datos_padron")


In [0]:
%sql
use default;
SHOW tables;    


In [0]:
#eliminar un directorio con archivos 

dbutils.fs.rm("/tmp/prueba", recurse=True)


In [0]:
#Haz el mismo guardado pero en formato parquet. Compara el peso del archivo con el resultado anterior.

df.write.format("parquet").mode("overwrite").partitionBy("DESC_BARRIO", "DESC_DISTRITO").save("/tmp/datos_padron_parquet")


In [0]:
#ver el tamaño del directorio 

display(dbutils.fs.ls("/tmp/datos_padron_parquet/DESC_BARRIO=ABRANTES/DESC_DISTRITO=CARABANCHEL"))

'''path,name,size
dbfs:/tmp/datos_padron_parquet/DESC_BARRIO=ABRANTES/DESC_DISTRITO=CARABANCHEL/_SUCCESS,_SUCCESS,0
dbfs:/tmp/datos_padron_parquet/DESC_BARRIO=ABRANTES/DESC_DISTRITO=CARABANCHEL/_committed_423211157586517047,_committed_423211157586517047,224
dbfs:/tmp/datos_padron_parquet/DESC_BARRIO=ABRANTES/DESC_DISTRITO=CARABANCHEL/_started_423211157586517047,_started_423211157586517047,0
dbfs:/tmp/datos_padron_parquet/DESC_BARRIO=ABRANTES/DESC_DISTRITO=CARABANCHEL/part-00000-tid-423211157586517047-c37f9832-682c-4445-8601-ac2969fc6d95-1572-1.c000.snappy.parquet,part-00000-tid-423211157586517047-c37f9832-682c-4445-8601-ac2969fc6d95-1572-1.c000.snappy.parquet,5035
dbfs:/tmp/datos_padron_parquet/DESC_BARRIO=ABRANTES/DESC_DISTRITO=CARABANCHEL/part-00002-tid-423211157586517047-c37f9832-682c-4445-8601-ac2969fc6d95-1574-1.c000.snappy.parquet,part-00002-tid-423211157586517047-c37f9832-682c-4445-8601-ac2969fc6d95-1574-1.c000.snappy.parquet,8720 '''


display(dbutils.fs.ls("/datos_padron/DESC_DISTRITO=CARABANCHEL/DESC_BARRIO=ABRANTES"))

'''path,name,size
dbfs:/datos_padron/DESC_DISTRITO=CARABANCHEL/DESC_BARRIO=ABRANTES/_SUCCESS,_SUCCESS,0
dbfs:/datos_padron/DESC_DISTRITO=CARABANCHEL/DESC_BARRIO=ABRANTES/_committed_3778510453995229431,_committed_3778510453995229431,202
dbfs:/datos_padron/DESC_DISTRITO=CARABANCHEL/DESC_BARRIO=ABRANTES/_started_3778510453995229431,_started_3778510453995229431,0
dbfs:/datos_padron/DESC_DISTRITO=CARABANCHEL/DESC_BARRIO=ABRANTES/part-00000-tid-3778510453995229431-22a99c7a-e9fd-44ce-b96d-58b768893371-166-8.c000.csv,part-00000-tid-3778510453995229431-22a99c7a-e9fd-44ce-b96d-58b768893371-166-8.c000.csv,15297
dbfs:/datos_padron/DESC_DISTRITO=CARABANCHEL/DESC_BARRIO=ABRANTES/part-00002-tid-3778510453995229431-22a99c7a-e9fd-44ce-b96d-58b768893371-168-1.c000.csv,part-00002-tid-3778510453995229431-22a99c7a-e9fd-44ce-b96d-58b768893371-168-1.c000.csv,48210'''

#la particion de parquet es 1/5 la del csv  

path,name,size
dbfs:/tmp/datos_padron_parquet/DESC_BARRIO=ABRANTES/DESC_DISTRITO=CARABANCHEL/_SUCCESS,_SUCCESS,0
dbfs:/tmp/datos_padron_parquet/DESC_BARRIO=ABRANTES/DESC_DISTRITO=CARABANCHEL/_committed_423211157586517047,_committed_423211157586517047,224
dbfs:/tmp/datos_padron_parquet/DESC_BARRIO=ABRANTES/DESC_DISTRITO=CARABANCHEL/_started_423211157586517047,_started_423211157586517047,0
dbfs:/tmp/datos_padron_parquet/DESC_BARRIO=ABRANTES/DESC_DISTRITO=CARABANCHEL/part-00000-tid-423211157586517047-c37f9832-682c-4445-8601-ac2969fc6d95-1572-1.c000.snappy.parquet,part-00000-tid-423211157586517047-c37f9832-682c-4445-8601-ac2969fc6d95-1572-1.c000.snappy.parquet,5035
dbfs:/tmp/datos_padron_parquet/DESC_BARRIO=ABRANTES/DESC_DISTRITO=CARABANCHEL/part-00002-tid-423211157586517047-c37f9832-682c-4445-8601-ac2969fc6d95-1574-1.c000.snappy.parquet,part-00002-tid-423211157586517047-c37f9832-682c-4445-8601-ac2969fc6d95-1574-1.c000.snappy.parquet,8720


path,name,size
dbfs:/datos_padron/DESC_DISTRITO=CARABANCHEL/DESC_BARRIO=ABRANTES/_SUCCESS,_SUCCESS,0
dbfs:/datos_padron/DESC_DISTRITO=CARABANCHEL/DESC_BARRIO=ABRANTES/_committed_3778510453995229431,_committed_3778510453995229431,202
dbfs:/datos_padron/DESC_DISTRITO=CARABANCHEL/DESC_BARRIO=ABRANTES/_started_3778510453995229431,_started_3778510453995229431,0
dbfs:/datos_padron/DESC_DISTRITO=CARABANCHEL/DESC_BARRIO=ABRANTES/part-00000-tid-3778510453995229431-22a99c7a-e9fd-44ce-b96d-58b768893371-166-8.c000.csv,part-00000-tid-3778510453995229431-22a99c7a-e9fd-44ce-b96d-58b768893371-166-8.c000.csv,15297
dbfs:/datos_padron/DESC_DISTRITO=CARABANCHEL/DESC_BARRIO=ABRANTES/part-00002-tid-3778510453995229431-22a99c7a-e9fd-44ce-b96d-58b768893371-168-1.c000.csv,part-00002-tid-3778510453995229431-22a99c7a-e9fd-44ce-b96d-58b768893371-168-1.c000.csv,48210


In [0]:
#Por último, prueba a hacer los ejercicios sugeridos en la parte de Hive con el csv "Datos Padrón" (incluyendo la importación con Regex) utilizando desde Spark EXCLUSIVAMENTE sentencias spark.sql, es decir, importar los archivos desde local directamente como tablas de Hive y haciendo todas las consultas sobre estas tablas sin transformarlas en ningún momento en DataFrames ni DataSets.



In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "Rango_Edades_Seccion_202112_csv"

# df.write.format("parquet").saveAsTable(permanent_table_name)