# ETL con Spark 
---

## Spark Session

In [2]:
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType

In [3]:
# Config
spark = (
    SparkSession
    .builder
    .appName("Vlc_social")
    .master("local[*]")
    .config("spark.ui.port", "4040")
    .config("spark.driver.host", "localhost")
    .getOrCreate()
)

spark

## Cleaning datasets

### Dataset "Poblacion_municipal.csv"

In [None]:
df_poblacion = spark.read.csv(
    "./data_vlc/Poblacion_municipal.csv",
    header=True,
    inferSchema=True,
    sep=";",
    encoding="ISO-8859-1"
)
# drop cols n change name
df_poblacion = df_poblacion.drop("Sexo", "Periodo")
df_poblacion = df_poblacion.withColumnRenamed("Total", "Poblacion_total")
# remove dots from "Poblacion_total" col
df_poblacion = df_poblacion.withColumn("Poblacion_total", F.regexp_replace(F.col("Poblacion_total"), "\\.", ""))
# change string to float
df_poblacion = df_poblacion.withColumn("Poblacion_total", F.col("Poblacion_total").cast(FloatType()))

df_poblacion.show(5)
df_poblacion.printSchema()

+--------------------+---------------+
|          Municipios|Poblacion_total|
+--------------------+---------------+
|        46001 Ademuz|         1015.0|
|          46002 Ador|         1755.0|
|      46004 Agullent|         2438.0|
|46042 Aielo de Ma...|         4624.0|
|46043 Aielo de Rugat|          160.0|
+--------------------+---------------+
only showing top 5 rows

root
 |-- Municipios: string (nullable = true)
 |-- Poblacion_total: float (nullable = true)



### Dataset "Renta_media.csv"

In [120]:
df_renta = spark.read.csv(
    "./data_vlc/Renta_media.csv",
    header=True,
    inferSchema=True,
    sep=";",
    encoding="ISO-8859-1"
)
# drop cols n change name
df_renta = df_renta.withColumnRenamed("Total", "Renta_media")
df_renta = df_renta.drop("Indicadores de renta media y mediana", "Distritos", "Secciones", "Periodo")
# remove dots from "Renta_media" col
df_renta = df_renta.withColumn("Renta_media", F.regexp_replace(F.col("Renta_media"), "\\.", ""))
# change string to float
df_renta = df_renta.withColumn("Renta_media", F.col("Renta_media").cast(FloatType()))

df_renta.show(5)
df_renta.printSchema()

+--------------------+-----------+
|          Municipios|Renta_media|
+--------------------+-----------+
|        46001 Ademuz|    12544.0|
|          46002 Ador|    13447.0|
|46003 Atzeneta d'...|    12812.0|
|      46004 Agullent|    12589.0|
|       46005 Alaquàs|    12930.0|
+--------------------+-----------+
only showing top 5 rows

root
 |-- Municipios: string (nullable = true)
 |-- Renta_media: float (nullable = true)



### Dataset "Paro_por_municipios.csv"

In [122]:
df_paro = spark.read.csv(
    "./data_vlc/Paro_por_municipios.csv",
    header=True,
    inferSchema=True,
    sep=";",
    encoding="ISO-8859-1"
)

# only keep valencia
df_paro = df_paro.filter(df_paro["Comunidad Autónoma"] == "Comunitat Valenciana")
# only from one month
df_paro = df_paro.filter(df_paro["Código mes "] == 202501)
# drop unnecessary info
df_paro = df_paro.drop("Código mes ", "mes", "Código de CA", "Comunidad Autónoma", "Codigo Provincia", "Provincia", " Municipio")
# drop last column with the index (odd name)
df_paro = df_paro.drop(df_paro.columns[-1])

# rename cols to snake case
df_paro = df_paro.withColumnRenamed("Codigo Municipio", "Codigo_municipio") \
    .withColumnRenamed("total Paro Registrado", "Total_paro_registrado") \
    .withColumnRenamed("Paro hombre edad < 25", "Paro_hombre_menor_25") \
    .withColumnRenamed("Paro hombre edad 25 -45 ", "Paro_hombre_25_45") \
    .withColumnRenamed("Paro hombre edad >=45", "Paro_hombre_45+") \
    .withColumnRenamed("Paro mujer edad < 25", "Paro_mujer_menor_25") \
    .withColumnRenamed("Paro mujer edad 25 -45 ", "Paro_mujer_25_45") \
    .withColumnRenamed("Paro mujer edad >=45", "Paro_mujer_45+") \
    .withColumnRenamed("Paro Agricultura", "Paro_agricultura") \
    .withColumnRenamed("Paro Industria", "Paro_industria") \
    .withColumnRenamed("Paro Construcción", "Paro_construccion") \
    .withColumnRenamed("Paro Servicios", "Paro_servicios") 

# change all the "<5" to 3 and make them all float
for col in df_paro.columns:
    df_paro = df_paro.withColumn(
        col,
        F.when(F.trim(F.col(col)) == "<5", 3)  # trim spaces
         .otherwise(F.regexp_replace(F.col(col), "\\.", ""))  # remove dots
         .cast("float")  # cast to float
    )

# leave codigo as an int
df_paro = df_paro.withColumn("Codigo_municipio", F.col("Codigo_municipio").cast("int"))

df_paro.show()
df_paro.printSchema()

+----------------+---------------------+--------------------+-----------------+---------------+-------------------+----------------+--------------+----------------+--------------+-----------------+--------------+
|Codigo_municipio|Total_paro_registrado|Paro_hombre_menor_25|Paro_hombre_25_45|Paro_hombre_45+|Paro_mujer_menor_25|Paro_mujer_25_45|Paro_mujer_45+|Paro_agricultura|Paro_industria|Paro_construccion|Paro_servicios|
+----------------+---------------------+--------------------+-----------------+---------------+-------------------+----------------+--------------+----------------+--------------+-----------------+--------------+
|            3001|                 19.0|                 0.0|              3.0|            5.0|                0.0|             5.0|           5.0|             3.0|           3.0|              3.0|          12.0|
|            3002|                339.0|                11.0|             40.0|           85.0|               10.0|            76.0|         117.0| 

## Joining datasets

In [125]:
# join df_poblacion and df_renta dataframes on "municipio" col 
df_social = df_poblacion.join(df_renta, on=["Municipios"], how="inner")
# separate municipios into "codigo_municipio" and "Municipio"
df_social = df_social.withColumn("Codigo_municipio", F.split(F.col("Municipios"), " ").getItem(0))
df_social = df_social.withColumn("Municipios", F.split(F.col("Municipios"), " ").getItem(1))
# join df_social with df_paro on "codigo_municipio"
df_social = df_social.join(df_paro, on=["Codigo_municipio"], how="inner")
# codigo municipio to int
df_social = df_social.withColumn("Codigo_municipio", F.col("Codigo_municipio").cast("int"))

df_social.show()
df_social.printSchema()

+----------------+-----------------+---------------+-----------+---------------------+--------------------+-----------------+---------------+-------------------+----------------+--------------+----------------+--------------+-----------------+--------------+
|Codigo_municipio|       Municipios|Poblacion_total|Renta_media|Total_paro_registrado|Paro_hombre_menor_25|Paro_hombre_25_45|Paro_hombre_45+|Paro_mujer_menor_25|Paro_mujer_25_45|Paro_mujer_45+|Paro_agricultura|Paro_industria|Paro_construccion|Paro_servicios|
+----------------+-----------------+---------------+-----------+---------------------+--------------------+-----------------+---------------+-------------------+----------------+--------------+----------------+--------------+-----------------+--------------+
|           46001|           Ademuz|         1015.0|    12544.0|                 71.0|                 6.0|              9.0|           20.0|                3.0|            17.0|          15.0|             3.0|           8.