## Un poquito de Spark.

Esta parte se realiza con Azure databricks y pyspark para probar dicho entorno. El propio databricks te crea una localización en forma de tabla tras subir el archivo CSV.

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window

# File location and type
file_location = "/FileStore/tables/Rango_Edades_Seccion_202112.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ";"

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location) \
  .na.fill(value=0) \
  .withColumn("DESC_DISTRITO",F.trim(col("DESC_DISTRITO"))) \
  .withColumn("DESC_BARRIO",F.trim(col("DESC_BARRIO")))

display(df)

COD_DISTRITO,DESC_DISTRITO,COD_DIST_BARRIO,DESC_BARRIO,COD_BARRIO,COD_DIST_SECCION,COD_SECCION,COD_EDAD_INT,EspanolesHombres,EspanolesMujeres,ExtranjerosHombres,ExtranjerosMujeres
1,CENTRO,101,PALACIO,1,1001,1,0,2,3,1,0
1,CENTRO,101,PALACIO,1,1001,1,1,7,0,1,0
1,CENTRO,101,PALACIO,1,1001,1,2,2,3,0,5
1,CENTRO,101,PALACIO,1,1001,1,3,3,1,0,0
1,CENTRO,101,PALACIO,1,1001,1,4,2,0,1,3
1,CENTRO,101,PALACIO,1,1001,1,5,2,3,0,0
1,CENTRO,101,PALACIO,1,1001,1,6,1,0,2,1
1,CENTRO,101,PALACIO,1,1001,1,7,1,2,0,0
1,CENTRO,101,PALACIO,1,1001,1,8,3,2,1,2
1,CENTRO,101,PALACIO,1,1001,1,9,3,0,0,1


In [0]:
#Crea una vista temporal de nombre "padron" y a través de ella cuenta el número de barrios diferentes que hay.

temp_table_name = "Rango_Edades_Seccion_202112_csv"

df.createOrReplaceTempView(temp_table_name)

In [0]:
#Enumera todos los barrios diferentes.

df1 = df.select(df.DESC_DISTRITO).distinct()

display(df1)

DESC_DISTRITO
LATINA
TETUAN
SALAMANCA
RETIRO
MONCLOA-ARAVACA
HORTALEZA
PUENTE DE VALLECAS
VILLAVERDE
CHAMBERI
CIUDAD LINEAL


In [0]:
%sql

/* Enumera todos los barrios diferentes. */

select distinct(DESC_DISTRITO) from `Rango_Edades_Seccion_202112_csv`

DESC_DISTRITO
LATINA
TETUAN
SALAMANCA
RETIRO
MONCLOA-ARAVACA
HORTALEZA
PUENTE DE VALLECAS
VILLAVERDE
CHAMBERI
CIUDAD LINEAL


In [0]:
#Crea una nueva columna que muestre la longitud de los campos de la columna DESC_DISTRITO y que se llame "longitud".

df2 = df.withColumn("longitud", F.length(df.DESC_DISTRITO))

display(df2)


COD_DISTRITO,DESC_DISTRITO,COD_DIST_BARRIO,DESC_BARRIO,COD_BARRIO,COD_DIST_SECCION,COD_SECCION,COD_EDAD_INT,EspanolesHombres,EspanolesMujeres,ExtranjerosHombres,ExtranjerosMujeres,longitud
1,CENTRO,101,PALACIO,1,1001,1,0,2,3,1,0,6
1,CENTRO,101,PALACIO,1,1001,1,1,7,0,1,0,6
1,CENTRO,101,PALACIO,1,1001,1,2,2,3,0,5,6
1,CENTRO,101,PALACIO,1,1001,1,3,3,1,0,0,6
1,CENTRO,101,PALACIO,1,1001,1,4,2,0,1,3,6
1,CENTRO,101,PALACIO,1,1001,1,5,2,3,0,0,6
1,CENTRO,101,PALACIO,1,1001,1,6,1,0,2,1,6
1,CENTRO,101,PALACIO,1,1001,1,7,1,2,0,0,6
1,CENTRO,101,PALACIO,1,1001,1,8,3,2,1,2,6
1,CENTRO,101,PALACIO,1,1001,1,9,3,0,0,1,6


In [0]:
#Crea una nueva columna que muestre el valor 5 para cada uno de los registros de la tabla.

df3 = df.withColumn("valor_5", F.lit(5))

display(df3)

COD_DISTRITO,DESC_DISTRITO,COD_DIST_BARRIO,DESC_BARRIO,COD_BARRIO,COD_DIST_SECCION,COD_SECCION,COD_EDAD_INT,EspanolesHombres,EspanolesMujeres,ExtranjerosHombres,ExtranjerosMujeres,valor_5
1,CENTRO,101,PALACIO,1,1001,1,0,2,3,1,0,5
1,CENTRO,101,PALACIO,1,1001,1,1,7,0,1,0,5
1,CENTRO,101,PALACIO,1,1001,1,2,2,3,0,5,5
1,CENTRO,101,PALACIO,1,1001,1,3,3,1,0,0,5
1,CENTRO,101,PALACIO,1,1001,1,4,2,0,1,3,5
1,CENTRO,101,PALACIO,1,1001,1,5,2,3,0,0,5
1,CENTRO,101,PALACIO,1,1001,1,6,1,0,2,1,5
1,CENTRO,101,PALACIO,1,1001,1,7,1,2,0,0,5
1,CENTRO,101,PALACIO,1,1001,1,8,3,2,1,2,5
1,CENTRO,101,PALACIO,1,1001,1,9,3,0,0,1,5


In [0]:
#Borra esta columna.

df4 = df3.drop('valor_5')

display(df4)

In [0]:
#Particiona el DataFrame por las variables DESC_DISTRITO y DESC_BARRIO.

df5 = df4.repartition(col("DESC_BARRIO"), col("DESC_DISTRITO"))

display(df5)
#window  = Window.partitionBy( F.col("DESC_DISTRITO"), F.col("DESC_BARRIO"))


COD_DISTRITO,DESC_DISTRITO,COD_DIST_BARRIO,DESC_BARRIO,COD_BARRIO,COD_DIST_SECCION,COD_SECCION,COD_EDAD_INT,EspanolesHombres,EspanolesMujeres,ExtranjerosHombres,ExtranjerosMujeres
2,ARGANZUELA,202,ACACIAS,2,2085,85,26,2,1,0,1
2,ARGANZUELA,202,ACACIAS,2,2085,85,27,5,5,0,1
2,ARGANZUELA,202,ACACIAS,2,2085,85,28,6,7,2,2
2,ARGANZUELA,202,ACACIAS,2,2085,85,29,7,10,1,1
2,ARGANZUELA,202,ACACIAS,2,2085,85,30,10,7,0,1
2,ARGANZUELA,202,ACACIAS,2,2085,85,31,8,9,0,0
2,ARGANZUELA,202,ACACIAS,2,2085,85,32,9,10,1,1
2,ARGANZUELA,202,ACACIAS,2,2085,85,33,7,6,1,1
2,ARGANZUELA,202,ACACIAS,2,2085,85,34,7,9,1,0
2,ARGANZUELA,202,ACACIAS,2,2085,85,35,15,12,0,0


In [0]:
#Almacénalo en caché. Consulta en el puerto 4040 (UI de Spark) de tu usuario local el estado de los rdds almacenados.

df5.cache()


In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "Rango_Edades_Seccion_202112_csv"

# df.write.format("parquet").saveAsTable(permanent_table_name)