In [3]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession
    .builder
    .appName("Padron")
    .getOrCreate()

import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2df8a826


In [4]:
val file = "./padron/Rango_Edades_Seccion_202104.csv"
val schema = "COD_DISTRITO int, DESC_DISTRITO string, COD_DIST_BARRIO int, DESC_BARRIO string, " +
"COD_BARRIO int, COD_DIST_SECCION int, COD_SECCION int, COD_EDAD_INT int, EspanolesHombres int, " +
"EspanolesMujeres int, ExtranjerosHombres int, ExtranjerosMujeres int"

file: String = ./padron/Rango_Edades_Seccion_202104.csv
schema: String = COD_DISTRITO int, DESC_DISTRITO string, COD_DIST_BARRIO int, DESC_BARRIO string, COD_BARRIO int, COD_DIST_SECCION int, COD_SECCION int, COD_EDAD_INT int, EspanolesHombres int, EspanolesMujeres int, ExtranjerosHombres int, ExtranjerosMujeres int


In [5]:
val df = spark.read.format("csv")
    .option("header", "true")
    .schema(schema) 
    .option("sep", ";")
    .load(file)
    .na.fill(0)

df: org.apache.spark.sql.DataFrame = [COD_DISTRITO: int, DESC_DISTRITO: string ... 10 more fields]


In [6]:
println(df.printSchema)

root
 |-- COD_DISTRITO: integer (nullable = false)
 |-- DESC_DISTRITO: string (nullable = true)
 |-- COD_DIST_BARRIO: integer (nullable = false)
 |-- DESC_BARRIO: string (nullable = true)
 |-- COD_BARRIO: integer (nullable = false)
 |-- COD_DIST_SECCION: integer (nullable = false)
 |-- COD_SECCION: integer (nullable = false)
 |-- COD_EDAD_INT: integer (nullable = false)
 |-- EspanolesHombres: integer (nullable = false)
 |-- EspanolesMujeres: integer (nullable = false)
 |-- ExtranjerosHombres: integer (nullable = false)
 |-- ExtranjerosMujeres: integer (nullable = false)

()


In [7]:
df.select("ExtranjerosHombres").show(5)

+------------------+
|ExtranjerosHombres|
+------------------+
|                 0|
|                 0|
|                 0|
|                 0|
|                 0|
+------------------+
only showing top 5 rows



In [64]:
df.show(5)

+------------+--------------------+---------------+--------------------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+
|COD_DISTRITO|       DESC_DISTRITO|COD_DIST_BARRIO|         DESC_BARRIO|COD_BARRIO|COD_DIST_SECCION|COD_SECCION|COD_EDAD_INT|EspanolesHombres|EspanolesMujeres|ExtranjerosHombres|ExtranjerosMujeres|
+------------+--------------------+---------------+--------------------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+
|           1|CENTRO              |            101|PALACIO             |         1|            1006|          6|          97|               1|               0|                 0|                 0|
|           1|CENTRO              |            101|PALACIO             |         1|            1006|          6|          98|               1|               1|                 0|                 0|
|         

In [71]:
df.select("desc_barrio").distinct.show(500, false)

+--------------------+
|desc_barrio         |
+--------------------+
|EMBAJADORES         |
|SANTA EUGENIA       |
|LEGAZPI             |
|DELICIAS            |
|CIUDAD UNIVERSITARIA|
|CONCEPCION          |
|VENTAS              |
|IMPERIAL            |
|SAN FERMIN          |
|TRAFALGAR           |
|ESTRELLA            |
|PUERTA BONITA       |
|ALUCHE              |
|ADELFAS             |
|PUEBLO NUEVO        |
|PALOMAS             |
|CORRALEJOS          |
|VALDEZARZA          |
|LOS CARMENES        |
|PUERTA DEL ANGEL    |
|ORCASITAS           |
|OPA?EL              |
|SAN ISIDRO          |
|ZOFIO               |
|LOS JERONIMOS       |
|ARCOS               |
|QUINTANA            |
|AEROPUERTO          |
|FUENTELARREINA      |
|VALLEHERMOSO        |
|CANILLAS            |
|PALACIO             |
|ALMENDRALES         |
|SAN CRISTOBAL       |
|CORTES              |
|CASTELLANA          |
|EL SALVADOR         |
|CHOPERA             |
|SAN JUAN BAUTISTA   |
|CUATRO CAMINOS      |
|VALVERDE  

In [8]:
df.createOrReplaceTempView("padron")

In [9]:
spark.sql("""select count(distinct desc_barrio) from padron""").show()

+---------------------------+
|count(DISTINCT desc_barrio)|
+---------------------------+
|                        132|
+---------------------------+



In [10]:
val df2 = df.withColumn("longitud", length(col("desc_distrito")))

df2.select("longitud").show()

+--------+
|longitud|
+--------+
|      20|
|      20|
|      20|
|      20|
|      20|
|      20|
|      20|
|      20|
|      20|
|      20|
|      20|
|      20|
|      20|
|      20|
|      20|
|      20|
|      20|
|      20|
|      20|
|      20|
+--------+
only showing top 20 rows



df2: org.apache.spark.sql.DataFrame = [COD_DISTRITO: int, DESC_DISTRITO: string ... 11 more fields]


In [11]:
val df3 = df.withColumn("Valor 5", lit("5"))

df3.show()

+------------+--------------------+---------------+--------------------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+-------+
|COD_DISTRITO|       DESC_DISTRITO|COD_DIST_BARRIO|         DESC_BARRIO|COD_BARRIO|COD_DIST_SECCION|COD_SECCION|COD_EDAD_INT|EspanolesHombres|EspanolesMujeres|ExtranjerosHombres|ExtranjerosMujeres|Valor 5|
+------------+--------------------+---------------+--------------------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+-------+
|           1|CENTRO              |            101|PALACIO             |         1|            1006|          6|          97|               1|               0|                 0|                 0|      5|
|           1|CENTRO              |            101|PALACIO             |         1|            1006|          6|          98|               1|               1|                 

df3: org.apache.spark.sql.DataFrame = [COD_DISTRITO: int, DESC_DISTRITO: string ... 11 more fields]


In [88]:
df.write.partitionBy("DESC_DISTRITO","DESC_BARRIO").mode("overwrite").saveAsTable("partitioned")

In [89]:
df.write.format("csv").partitionBy("DESC_DISTRITO","DESC_BARRIO").mode("overwrite")
    .save("./particion")

In [12]:
val file_part = "./particion"



val df_part = spark.read.format("csv")
    .option("header", "true")
    .schema(schema) 
    .option("sep", ";")
    .load(file_part)
    .na.fill(0)

file_part: String = ./particion
df_part: org.apache.spark.sql.DataFrame = [COD_DISTRITO: int, COD_DIST_BARRIO: int ... 10 more fields]


In [93]:
df_part.cache()

res63: df_part.type = [COD_DISTRITO: int, COD_DIST_BARRIO: int ... 10 more fields]


In [104]:
df_part.groupBy("desc_distrito", "desc_barrio")
    .agg(sum("espanoleshombres"), sum("espanolesmujeres"), 
    sum("extranjeroshombres"), sum("extranjerosmujeres"))
    .orderBy("sum(extranjerosmujeres)", "sum(extranjeroshombres)")
    .show()

+--------------------+--------------------+---------------------+---------------------+-----------------------+-----------------------+
|       desc_distrito|         desc_barrio|sum(espanoleshombres)|sum(espanolesmujeres)|sum(extranjeroshombres)|sum(extranjerosmujeres)|
+--------------------+--------------------+---------------------+---------------------+-----------------------+-----------------------+
|CARABANCHEL         |VISTA ALEGRE        |                    0|                    0|                      0|                      0|
|MORATALAZ           |VINATEROS           |                    0|                    0|                      0|                      0|
|SAN BLAS-CANILLEJAS |SIMANCAS            |                    0|                    0|                      0|                      0|
|CARABANCHEL         |PUERTA BONITA       |                    0|                    0|                      0|                      0|
|ARGANZUELA          |LEGAZPI             |     

In [105]:
df_part.unpersist()

res73: df_part.type = [COD_DISTRITO: int, COD_DIST_BARRIO: int ... 10 more fields]


In [106]:
df.groupBy("desc_distrito", "desc_barrio")
    .agg(sum("espanoleshombres"), sum("espanolesmujeres"), 
    sum("extranjeroshombres"), sum("extranjerosmujeres"))
    .orderBy("sum(extranjerosmujeres)", "sum(extranjeroshombres)")
    .show()

+--------------------+--------------------+---------------------+---------------------+-----------------------+-----------------------+
|       desc_distrito|         desc_barrio|sum(espanoleshombres)|sum(espanolesmujeres)|sum(extranjeroshombres)|sum(extranjerosmujeres)|
+--------------------+--------------------+---------------------+---------------------+-----------------------+-----------------------+
|LATINA              |BARRIOS EN EDIF. BDC|                    0|                    1|                      0|                      0|
|ARGANZUELA          |ATOCHA              |                  693|                  705|                     34|                     59|
|FUENCARRAL-EL PARDO |EL PARDO            |                 1657|                 1624|                     71|                     93|
|CIUDAD LINEAL       |ATALAYA             |                  604|                  794|                     90|                    104|
|FUENCARRAL-EL PARDO |FUENTELARREINA      |     

In [13]:
val nuevoDF = df.groupBy("desc_distrito", "desc_barrio")
    .agg(sum("espanoleshombres") as "espanoleshombres")

nuevoDF: org.apache.spark.sql.DataFrame = [desc_distrito: string, desc_barrio: string ... 1 more field]


In [14]:
nuevoDF.show()

+--------------------+--------------------+----------------+
|       desc_distrito|         desc_barrio|espanoleshombres|
+--------------------+--------------------+----------------+
|TETUAN              |VALDEACEDERAS       |            9552|
|CHAMARTIN           |CIUDAD JARDIN       |            7305|
|ARGANZUELA          |DELICIAS            |           11743|
|MORATALAZ           |VINATEROS           |            6470|
|CARABANCHEL         |VISTA ALEGRE        |           16016|
|SAN BLAS-CANILLEJAS |SIMANCAS            |           10737|
|CARABANCHEL         |PUERTA BONITA       |           12565|
|ARGANZUELA          |LEGAZPI             |            8879|
|VICALVARO           |VALDEBERNARDO       |            7720|
|ARGANZUELA          |ATOCHA              |             693|
|CHAMBERI            |RIOS ROSAS          |           10541|
|MORATALAZ           |FONTARRON           |            6617|
|CIUDAD LINEAL       |SAN JUAN BAUTISTA   |            5213|
|VILLA DE VALLECAS   |CA

In [15]:
nuevoDF.join(df, df("desc_distrito")===nuevoDF("desc_distrito") && df("desc_barrio")===nuevoDF("desc_barrio")).show()

+--------------------+--------------------+----------------+------------+--------------------+---------------+--------------------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+
|       desc_distrito|         desc_barrio|espanoleshombres|COD_DISTRITO|       DESC_DISTRITO|COD_DIST_BARRIO|         DESC_BARRIO|COD_BARRIO|COD_DIST_SECCION|COD_SECCION|COD_EDAD_INT|EspanolesHombres|EspanolesMujeres|ExtranjerosHombres|ExtranjerosMujeres|
+--------------------+--------------------+----------------+------------+--------------------+---------------+--------------------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+
|TETUAN              |VALDEACEDERAS       |            9552|           6|TETUAN              |            605|VALDEACEDERAS       |         5|            6102|        102|          90|               3|               1|           

In [114]:
spark.sql("""DROP TABLE IF EXISTS padron_tbl;""")

res81: org.apache.spark.sql.DataFrame = []


In [24]:
val pivotDF = df.filter(col("desc_distrito").contains("BARAJAS").alias("BARAJAS") ||
                       col("desc_distrito").contains("CENTRO") ||
                       col("desc_distrito").contains("RETIRO"))
    //.groupBy("desc_distrito", "cod_edad_int")
    .groupBy("cod_edad_int")
    .pivot("desc_distrito")
    .agg(sum("espanolesmujeres") as "espanolesmujeres")
    .na.fill(0)
    

pivotDF.show()

+------------+--------------------+--------------------+--------------------+
|cod_edad_int|BARAJAS             |CENTRO              |RETIRO              |
+------------+--------------------+--------------------+--------------------+
|          31|                 191|                 931|                 615|
|          85|                  95|                 354|                 535|
|          65|                 212|                 656|                 951|
|          53|                 342|                 754|                 788|
|          78|                 187|                 451|                 654|
|         108|                   1|                   1|                   1|
|          34|                 250|                 883|                 645|
|         101|                   5|                  14|                  19|
|          81|                 159|                 364|                 512|
|          28|                 172|                 863|        

pivotDF: org.apache.spark.sql.DataFrame = [cod_edad_int: int, BARAJAS             : bigint ... 2 more fields]


In [26]:
val pivotDF2 = pivotDF.withColumn("porcentajeBarajas", col("BARAJAS             ")/
                                  (col("BARAJAS             ")
                                   +col("CENTRO              ")
                                   +col("RETIRO              "))*100)
.withColumn("porcentajeCentro", col("CENTRO              ")/(col("BARAJAS             ")
                                   +col("CENTRO              ")
                                   +col("RETIRO              "))*100)
.withColumn("porcentajeRetiro", col("RETIRO              ")/(col("BARAJAS             ")
                                   +col("CENTRO              ")
                                   +col("RETIRO              "))*100)

pivotDF2.show()

+------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+
|cod_edad_int|BARAJAS             |CENTRO              |RETIRO              | porcentajeBarajas|  porcentajeCentro|  porcentajeRetiro|
+------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+
|          31|                 191|                 931|                 615|10.995970063327576| 53.59815774323546| 35.40587219343696|
|          85|                  95|                 354|                 535| 9.654471544715447| 35.97560975609756| 54.36991869918699|
|          65|                 212|                 656|                 951|11.654755360087961|36.063771302913686| 52.28147333699835|
|          53|                 342|                 754|                 788|18.152866242038215| 40.02123142250531|41.825902335456476|
|          78|                 187|                 451

pivotDF2: org.apache.spark.sql.DataFrame = [cod_edad_int: int, BARAJAS             : bigint ... 5 more fields]


In [27]:
df.write.format("parquet").partitionBy("DESC_DISTRITO","DESC_BARRIO").mode("overwrite")
    .save("./particion2")