# Procesamiento en batch con Apache Spark
## 1. Cargado de información
La información que será usada en este ejercicio, se obtuvo directamente de Datos Abiertos de Colombia [Entrada de extranjeros a Colombia desde 2012](https://www.datos.gov.co/Estad-sticas-Nacionales/Entradas-de-extranjeros-a-Colombia/96sh-4v8d/about_data).

In [1]:
# Inicializamos una instancia de Apache Spark, importanto librerias y usando SparkSession.builder para dar los datos de conexión
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, when, sum, lpad, length
spark = SparkSession.builder \
    .appName("ProcesamientoY_EDA_Reseñas") \
    .master("local[*]") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/10/23 19:27:15 WARN Utils: Your hostname, kevocde-Sword-15-A12VF, resolves to a loopback address: 127.0.1.1; using 192.168.1.7 instead (on interface wlo1)
25/10/23 19:27:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/23 19:27:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Cargamos la información directamente desde el filesystem en formato csv
df_migrations = spark.read.csv("public/Entradas_de_extranjeros_a_Colombia_20251022.csv", header=True, inferSchema=True)
df_migrations.show(5)

+----+-----+------------+----------+--------+---------+----------+-----+--------------------+
| Año|  Mes|Nacionalidad|Codigo M49|Femenino|Masculino|Indefinido|Total|       Ubicación PCM|
+----+-----+------------+----------+--------+---------+----------+-----+--------------------+
|2012|Enero|     ALBANIA|         8|       1|        2|      NULL|    3|(4.697144,-74.140...|
|2012|Enero|    ALEMANIA|       276|       1|        1|      NULL|    2|(0.25129,-76.875963)|
|2012|Enero|    ALEMANIA|       276|      63|      102|      NULL|  165|(0.814836,-77.662...|
|2012|Enero|    ALEMANIA|       276|      22|       23|      NULL|   45|(10.408582,-75.53...|
|2012|Enero|    ALEMANIA|       276|      27|       48|      NULL|   75|(10.445761,-75.51...|
+----+-----+------------+----------+--------+---------+----------+-----+--------------------+
only showing top 5 rows


## 2. Limpieza, transformación y análisis exploratorio
Una vez cargada la información, spark nos dará un dataframe por defecto, así que este será el mecanismo por el cual se manipulará la información para el presente ejercicio. Ahora, procederemos a hacer una limpieza y transformación para finalizar con un EDA.

In [3]:
# El siguiente código se hizo en una sola instrucción por facilidad, pero lo explico aquí mismo
#  1. drop: Se hace una limpieza de columnas con data no relevante para nuestro análisis
#  2. withColumnsRenamed: Se modifica el nombre de las columnas existentes por mejorar el typo y por estandarizar el lenguaje.
#  3. withColumn: Se limpia la columna de month, para que los datos sean el número del més no su nombre, por facilidad de entendimiento.
#  4. withColumn: Se aplican filtros a la columna gender_unknown, aunque al parecer no hay ningún dato.
#  5. withColum: Se rellenan los datos de nationality para que cumplan el estandar m49
#  6. withColumns: se aplica un replace en las columnas numéricas, ya que los datos tienen comas en las cifras de miles.
#  7. withColumns: Se hace un cast en las columnas de tipo entero, para que sean fácilmente operables más adelante.
#  8. filter*3: Se hacen los filtros necesarios para eliminar datos erroneos.
df_migrations = df_migrations \
    .drop("Nacionalidad", "Ubicación PCM") \
    .withColumnsRenamed({
        "Año": "year",
        "Mes": "month",
        "Total": "total",
        "Codigo M49": "nationality",
        "Femenino": "gender_female",
        "Masculino": "gender_male",
        "Indefinido": "gender_unknown",
    }) \
    .withColumn("month",
        when(col("month") == "Enero", 1)
        .when(col("month") == "Febrero", 2)
        .when(col("month") == "Marzo", 3)
        .when(col("month") == "Abril", 4)
        .when(col("month") == "Mayo", 5)
        .when(col("month") == "Junio", 6)
        .when(col("month") == "Julio", 7)
        .when(col("month") == "Agosto", 8)
        .when(col("month") == "Septiembre", 9)
        .when(col("month") == "Octubre", 10)
        .when(col("month") == "Noviembre", 11)
        .when(col("month") == "Diciembre", 12)
        .otherwise(None)
    ) \
    .withColumn("gender_unknown",
        when(col("gender_unknown").isNull(), 0)
        .otherwise(col("gender_unknown"))
    ) \
    .withColumn("nationality", lpad(col("nationality"), 3, "0")) \
    .withColumns({
        "gender_female": regexp_replace(col("gender_female"), ",", ""),
        "gender_male": regexp_replace(col("gender_male"), ",", ""),
        "gender_unknown": regexp_replace(col("gender_unknown"), ",", ""),
        "total": regexp_replace(col("total"), ",", ""),
    }) \
    .withColumns({
        "gender_female": col("gender_female").cast("int"),
        "gender_male": col("gender_male").cast("int"),
        "gender_unknown": col("gender_unknown").cast("int"),
        "total": col("total").cast("int"),
    }) \
    .filter(col("total") > 0) \
    .filter(col("month").isNotNull()) \
    .filter(length(col("nationality")) <= 3)

df_migrations.show(5)

+----+-----+-----------+-------------+-----------+--------------+-----+
|year|month|nationality|gender_female|gender_male|gender_unknown|total|
+----+-----+-----------+-------------+-----------+--------------+-----+
|2012|    1|        008|            1|          2|             0|    3|
|2012|    1|        276|            1|          1|             0|    2|
|2012|    1|        276|           63|        102|             0|  165|
|2012|    1|        276|           22|         23|             0|   45|
|2012|    1|        276|           27|         48|             0|   75|
+----+-----+-----------+-------------+-----------+--------------+-----+
only showing top 5 rows


### Análisis EDA
Requerimos analizar, primero el esquema, las columnas de género y la agrupación por años, por meses, por años nacionalidad ...

In [4]:
# Imprimimos el esquema
df_migrations.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- nationality: string (nullable = true)
 |-- gender_female: integer (nullable = true)
 |-- gender_male: integer (nullable = true)
 |-- gender_unknown: integer (nullable = true)
 |-- total: integer (nullable = true)



In [5]:
# Analizamos las columnas de género
df_migrations.describe(["gender_female"]).show()
df_migrations.describe(["gender_male"]).show()
df_migrations.describe(["gender_unknown"]).show()

+-------+------------------+
|summary|     gender_female|
+-------+------------------+
|  count|            164908|
|   mean|103.09747253013802|
| stddev| 715.4209251755556|
|    min|                 0|
|    max|             39366|
+-------+------------------+

+-------+----------------+
|summary|     gender_male|
+-------+----------------+
|  count|          164908|
|   mean|138.576090911296|
| stddev|926.006684973606|
|    min|               0|
|    max|           47999|
+-------+----------------+

+-------+--------------+
|summary|gender_unknown|
+-------+--------------+
|  count|        164908|
|   mean|           0.0|
| stddev|           0.0|
|    min|             0|
|    max|             0|
+-------+--------------+



In [6]:
# Revisaremos ahora los datos por agrupaciones
#  Aquí vemos la relación del total de migrantes por año en orden ascendente:
#   Claramente vemos una disminución de migrantes de todas las nacionalidades en época de pandemia.
df_migrations.groupBy("year").agg(sum("total").alias("total")).sort("year").show()

+----+-------+
|year|  total|
+----+-------+
|2012|1698888|
|2013|1841365|
|2014|2055393|
|2015|2385705|
|2016|2697613|
|2017|3341928|
|2018|4147444|
|2019|3982183|
|2020| 999157|
|2021|1519490|
|2022|3507293|
|2023|4337114|
|2024|4844326|
|2025|2497788|
+----+-------+



In [7]:
# Aquí vemos el total de migrantes por nacionalidad, la nacionalidad está en formato m49:
#  Pero vemos en primer lugar a Estados Unidos (que ironía), luego vemos a Venezuela (que novedad), luego Mexico etc.
df_migrations.groupBy("nationality").agg(sum("total").alias("total")).sort("total", ascending=False).show()

+-----------+-------+
|nationality|  total|
+-----------+-------+
|        840|8461881|
|        862|7012035|
|        484|2424909|
|        218|2381632|
|        604|1915423|
|        076|1821977|
|        032|1702233|
|        724|1615736|
|        152|1602198|
|        591|1252255|
|        250| 858399|
|        188| 820179|
|        124| 784573|
|        276| 774184|
|        528| 696592|
|        214| 581009|
|        826| 558177|
|        380| 538852|
|        320| 340336|
|        222| 316584|
+-----------+-------+
only showing top 20 rows


## 3. Finalizamos con el almacenado de los datos procesados
Almacenaremos tan solo los datos filtrados.

In [8]:
df_migrations.write.mode("overwrite").parquet("migration_data")

                                                                                