# **EVALUACIÓN FINAL**: ANÁLISIS DE MOVIMIENTOS MIGRATORIOS CON SPARK

## Descripción
Eres parte de un equipo de analistas de datos encargado de estudiar las tendencias de migración humana en el siglo XXI utilizando Big Data. Para ello, trabajarás con un conjunto de datos que contiene información sobre migraciones entre distintos países, sus causas y el impacto socioeconómico en las regiones de origen y destino.

## Objetivos de la actividad
1. Aplicar conceptos de Big Data utilizando Apache Spark y PySpark.
2. Explorar y transformar datos con RDDs y DataFrames.
3. Realizar consultas con Spark SQL.
4. Implementar modelos de aprendizaje automático con MLlib.

## Instrucciones

1. Carga y exploración de datos (2 puntos)
   - Carga el dataset proporcionado en Spark.
   - Convierte los datos en un RDD y un DataFrame.
   - Explora los datos: muestra las primeras filas, el esquema y genera estadísticas descriptivas.

In [None]:
# 1.0 Instalación (solo si no tenemos PySpark en este runtime, en este caso, no lo ejecuté)
!apt-get -qq update
!apt-get -qq install -y openjdk-11-jdk
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["PATH"] += ":/usr/lib/jvm/java-11-openjdk-amd64/bin"

# Pinned a PySpark 3.5.x (compatible con Java 11)
!pip -q install pyspark==3.5.1

In [None]:
# 1.1 Cargamos el dataset en Spark
from pyspark.sql import SparkSession

# Abrimos una SparkSession
spark = (SparkSession.builder
         .appName("Evaluacion_Final_M9_Migraciones")
         .master("local[*]")
         .getOrCreate())

# Subimos el archivo desde el computador
from google.colab import files
uploaded = files.upload()

# Cargamos el dataframe
df = (spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv("migraciones.csv"))

# Vemos las filas y esquema
df.show(10, truncate=False)
df.printSchema()

# Contamos las filas
print("Total de filas:", df.count())

# Revisamos los descriptivos numéricos
df.describe().show()

# Mostramos el resumen ampliado (incluye percentiles)
df.summary().show()

Saving migraciones.csv to migraciones.csv
+---+---------+---------------+----+---------+----------+-----------+---------------------+----------------------+----------------------+-----------------------+----------------+-----------------+
|ID |Origen   |Destino        |Año |Razón    |PIB_Origen|PIB_Destino|Tasa_Desempleo_Origen|Tasa_Desempleo_Destino|Nivel_Educativo_Origen|Nivel_Educativo_Destino|Población_Origen|Población_Destino|
+---+---------+---------------+----+---------+----------+-----------+---------------------+----------------------+----------------------+-----------------------+----------------+-----------------+
|1  |México   |EEUU           |2015|Económica|8900      |62000      |5.2                  |3.8                   |8.5                   |12.3                   |125000000       |331000000        |
|2  |Siria    |Alemania       |2016|Conflicto|2500      |45000      |15.4                 |4.5                   |6.2                   |14.1                   |18000000 

In [2]:
# 1.2 Convertimos los datos
rdd = df.rdd

# Mostramos los primeros 3 registros como objetos Row
print(rdd.take(3))

# Vemos encabezados detectados y tipos de columna
for c in df.dtypes:
    print(c)

[Row(ID=1, Origen='México', Destino='EEUU', Año=2015, Razón='Económica', PIB_Origen=8900, PIB_Destino=62000, Tasa_Desempleo_Origen=5.2, Tasa_Desempleo_Destino=3.8, Nivel_Educativo_Origen=8.5, Nivel_Educativo_Destino=12.3, Población_Origen=125000000, Población_Destino=331000000), Row(ID=2, Origen='Siria', Destino='Alemania', Año=2016, Razón='Conflicto', PIB_Origen=2500, PIB_Destino=45000, Tasa_Desempleo_Origen=15.4, Tasa_Desempleo_Destino=4.5, Nivel_Educativo_Origen=6.2, Nivel_Educativo_Destino=14.1, Población_Origen=18000000, Población_Destino=83000000), Row(ID=3, Origen='Venezuela', Destino='Colombia', Año=2017, Razón='Política', PIB_Origen=6000, PIB_Destino=15000, Tasa_Desempleo_Origen=14.8, Tasa_Desempleo_Destino=10.1, Nivel_Educativo_Origen=9.3, Nivel_Educativo_Destino=11.0, Población_Origen=28000000, Población_Destino=50000000)]
('ID', 'int')
('Origen', 'string')
('Destino', 'string')
('Año', 'int')
('Razón', 'string')
('PIB_Origen', 'int')
('PIB_Destino', 'int')
('Tasa_Desempleo_

2. Procesamiento de datos con RDDs y DataFrames (3 puntos)
   - Aplica transformaciones sobre los RDDs (filter, map, flatMap).
   - Aplica acciones sobre los RDDs (collect, take, count).
   - Realiza operaciones con DataFrames: filtrado, agregaciones y ordenamiento.
   - Escribe los resultados en formato Parquet.

In [3]:
# 2.1 Procesamiento de datos
from pyspark.sql import functions as F

# RDD transformaciones
rdd_filtrado = rdd.filter(lambda row: row.Tasa_Desempleo_Origen is not None and row.Tasa_Desempleo_Origen > 10)

# Pares (Origen, 1) para contar registros por país de origen en condición "estresada"
rdd_pairs = rdd_filtrado.map(lambda row: (row.Origen, 1))
rdd_conteo_origen = rdd_pairs.reduceByKey(lambda a, b: a + b)

# Ejemplo de flatMap sobre Razón (tokenizamos por espacios, útil si hubiera frases múltiples)
rdd_tokens_razon = (df.select("Razón").rdd
                      .flatMap(lambda row: row["Razón"].lower().split()))  # genera un RDD de palabras

# Acciones
print("Total filas con desempleo_origen > 10%:", rdd_filtrado.count())
print("Top 5 países de origen (condición >10%):", rdd_conteo_origen.take(5))
print("Tokens únicos en 'Razón':", rdd_tokens_razon.distinct().collect())

Total filas con desempleo_origen > 10%: 2
Top 5 países de origen (condición >10%): [('Siria', 1), ('Venezuela', 1)]
Tokens únicos en 'Razón': ['económica', 'conflicto', 'política', 'laboral']


In [4]:
# 2.2 Realizmos operaciones con DataFrame

# Top destinos por PIB_Destino promedio
df_top_destinos_pib = (df.groupBy("Destino")
                       .agg(F.avg("PIB_Destino").alias("PIB_Destino_prom"))
                       .orderBy(F.col("PIB_Destino_prom").desc()))
df_top_destinos_pib.show(truncate=False)

# Contamos flujos por Origen desde 2016
df_conteo_origen_2016 = (df.filter(F.col("Año") >= 2016)
                           .groupBy("Origen")
                           .agg(F.count("*").alias("n_flujos"))
                           .orderBy(F.col("n_flujos").desc(), F.col("Origen").asc()))
df_conteo_origen_2016.show(truncate=False)

# Ordenamos por Año (desc) mostrando columnas clave
df_ordenado = df.select("ID","Origen","Destino","Año","Razón","PIB_Origen","PIB_Destino") \
                .orderBy(F.col("Año").desc())
df_ordenado.show(truncate=False)

+---------------+----------------+
|Destino        |PIB_Destino_prom|
+---------------+----------------+
|EEUU           |62000.0         |
|Alemania       |45000.0         |
|Emiratos Árabes|43000.0         |
|España         |34000.0         |
|Colombia       |15000.0         |
+---------------+----------------+

+---------+--------+
|Origen   |n_flujos|
+---------+--------+
|Argentina|1       |
|India    |1       |
|Siria    |1       |
|Venezuela|1       |
+---------+--------+

+---+---------+---------------+----+---------+----------+-----------+
|ID |Origen   |Destino        |Año |Razón    |PIB_Origen|PIB_Destino|
+---+---------+---------------+----+---------+----------+-----------+
|5  |Argentina|España         |2019|Económica|10000     |34000      |
|4  |India    |Emiratos Árabes|2018|Laboral  |2200      |43000      |
|3  |Venezuela|Colombia       |2017|Política |6000      |15000      |
|2  |Siria    |Alemania       |2016|Conflicto|2500      |45000      |
|1  |México   |EEUU      

In [5]:
# 2.3 Escribe en formato Parquemet

#Directorios de salida (se sobreescriben para que puedas re-ejecutar)
out1 = "parquet/top_destinos_pib"
out2 = "parquet/conteo_origen_2016"
out3 = "parquet/ordenado_por_anio"

(df_top_destinos_pib.coalesce(1)
    .write.mode("overwrite").parquet(out1))

(df_conteo_origen_2016.coalesce(1)
    .write.mode("overwrite").parquet(out2))

(df_ordenado.coalesce(1)
    .write.mode("overwrite").parquet(out3))

print("Parquet escritos en:", out1, "|", out2, "|", out3)

Parquet escritos en: parquet/top_destinos_pib | parquet/conteo_origen_2016 | parquet/ordenado_por_anio


3. Consultas con Spark SQL (2 puntos)
   - Registra el DataFrame como una tabla temporal.
   - Realiza consultas sobre los principales países de origen y destino.
   - Analiza las principales razones de migración por región.

In [6]:
# 3.1 Registramos el DataFrame en una vista temporal
df.createOrReplaceTempView("migraciones")

# Top destinos por PIB_Destino promedio
sql_top_destinos = spark.sql("""
SELECT Destino,
       AVG(PIB_Destino) AS PIB_Destino_prom
FROM migraciones
GROUP BY Destino
ORDER BY PIB_Destino_prom DESC
""")
sql_top_destinos.show(truncate=False)

# Conteo de flujos por Origen desde 2016
sql_conteo_origen_2016 = spark.sql("""
SELECT Origen,
       COUNT(*) AS n_flujos
FROM migraciones
WHERE `Año` >= 2016
GROUP BY Origen
ORDER BY n_flujos DESC, Origen ASC
""")
sql_conteo_origen_2016.show(truncate=False)

# Razones de migración por destino
sql_razon_por_destino = spark.sql("""
SELECT Destino,
       `Razón`,
       COUNT(*) AS n
FROM migraciones
GROUP BY Destino, `Razón`
ORDER BY n DESC, Destino ASC
""")
sql_razon_por_destino.show(truncate=False)

+---------------+----------------+
|Destino        |PIB_Destino_prom|
+---------------+----------------+
|EEUU           |62000.0         |
|Alemania       |45000.0         |
|Emiratos Árabes|43000.0         |
|España         |34000.0         |
|Colombia       |15000.0         |
+---------------+----------------+

+---------+--------+
|Origen   |n_flujos|
+---------+--------+
|Argentina|1       |
|India    |1       |
|Siria    |1       |
|Venezuela|1       |
+---------+--------+

+---------------+---------+---+
|Destino        |Razón    |n  |
+---------------+---------+---+
|Alemania       |Conflicto|1  |
|Colombia       |Política |1  |
|EEUU           |Económica|1  |
|Emiratos Árabes|Laboral  |1  |
|España         |Económica|1  |
+---------------+---------+---+



In [7]:
# 3.2 Principales paises origen destino
sql_indicadores = spark.sql("""
SELECT
  ID, Origen, Destino, `Año`, `Razón`,
  PIB_Origen, PIB_Destino,
  Tasa_Desempleo_Origen, Tasa_Desempleo_Destino,
  Nivel_Educativo_Origen, Nivel_Educativo_Destino,
  (PIB_Destino * 1.0 / PIB_Origen)            AS Multiplicador_PIB,
  (Tasa_Desempleo_Origen - Tasa_Desempleo_Destino) AS Gap_Desempleo
FROM migraciones
ORDER BY `Año` DESC
""")
sql_indicadores.show(truncate=False)

+---+---------+---------------+----+---------+----------+-----------+---------------------+----------------------+----------------------+-----------------------+-----------------+-------------------+
|ID |Origen   |Destino        |Año |Razón    |PIB_Origen|PIB_Destino|Tasa_Desempleo_Origen|Tasa_Desempleo_Destino|Nivel_Educativo_Origen|Nivel_Educativo_Destino|Multiplicador_PIB|Gap_Desempleo      |
+---+---------+---------------+----+---------+----------+-----------+---------------------+----------------------+----------------------+-----------------------+-----------------+-------------------+
|5  |Argentina|España         |2019|Económica|10000     |34000      |9.5                  |13.2                  |11.5                  |13.8                   |3.400000000000   |-3.6999999999999993|
|4  |India    |Emiratos Árabes|2018|Laboral  |2200      |43000      |7.2                  |1.8                   |7.8                   |13.0                   |19.545454545455  |5.4                |


In [8]:
# 3.3 Principales razones de migración por región
out_sql1 = "parquet_sql/top_destinos_pib"
out_sql2 = "parquet_sql/conteo_origen_2016"
out_sql3 = "parquet_sql/razon_por_destino"
out_sql4 = "parquet_sql/indicadores"

sql_top_destinos.coalesce(1).write.mode("overwrite").parquet(out_sql1)
sql_conteo_origen_2016.coalesce(1).write.mode("overwrite").parquet(out_sql2)
sql_razon_por_destino.coalesce(1).write.mode("overwrite").parquet(out_sql3)
sql_indicadores.coalesce(1).write.mode("overwrite").parquet(out_sql4)

print("Parquet SQL escritos en:", out_sql1, "|", out_sql2, "|", out_sql3, "|", out_sql4)

Parquet SQL escritos en: parquet_sql/top_destinos_pib | parquet_sql/conteo_origen_2016 | parquet_sql/razon_por_destino | parquet_sql/indicadores


4. Aplicación de MLlib para predicción de flujos migratorios (3 puntos)
   - Convierte los datos en un formato adecuado para MLlib.
   - Aplica un modelo de regresión logística para predecir la probabilidad de migración basada en factores socioeconómicos.
   - Evalúa el modelo y analiza su precisión.

In [9]:
# 4.1 Convertimos los datos en un formato adecuado para MLlib
from pyspark.sql import functions as F

# Partimos del DataFrame original df
df_features = (df
  .withColumn("Multiplicador_PIB", F.col("PIB_Destino") / F.col("PIB_Origen"))
  .withColumn("Gap_Desempleo", F.col("Tasa_Desempleo_Origen") - F.col("Tasa_Desempleo_Destino"))
  .withColumn("Gap_Nivel_Educativo", F.col("Nivel_Educativo_Destino") - F.col("Nivel_Educativo_Origen"))
)

# Regla proxy de "migración atractiva" (label binaria)
df_labeled = (df_features
  .withColumn(
      "label",
      F.when(
          ((F.col("Multiplicador_PIB") > F.lit(3)) | (F.col("Gap_Desempleo") > F.lit(2))) &
          (F.col("Gap_Nivel_Educativo") >= F.lit(0)),
          F.lit(1.0)
      ).otherwise(F.lit(0.0))
  )
)

# Chequeo rápido
df_labeled.select(
    "ID","Origen","Destino","Año","Razón","Multiplicador_PIB","Gap_Desempleo","Gap_Nivel_Educativo","label"
).orderBy(F.col("Año").desc()).show(truncate=False)
print("Distribución de la etiqueta:", df_labeled.groupBy("label").count().orderBy("label").collect())

+---+---------+---------------+----+---------+------------------+-------------------+-------------------+-----+
|ID |Origen   |Destino        |Año |Razón    |Multiplicador_PIB |Gap_Desempleo      |Gap_Nivel_Educativo|label|
+---+---------+---------------+----+---------+------------------+-------------------+-------------------+-----+
|5  |Argentina|España         |2019|Económica|3.4               |-3.6999999999999993|2.3000000000000007 |1.0  |
|4  |India    |Emiratos Árabes|2018|Laboral  |19.545454545454547|5.4                |5.2                |1.0  |
|3  |Venezuela|Colombia       |2017|Política |2.5               |4.700000000000001  |1.6999999999999993 |1.0  |
|2  |Siria    |Alemania       |2016|Conflicto|18.0              |10.9               |7.8999999999999995 |1.0  |
|1  |México   |EEUU           |2015|Económica|6.966292134831461 |1.4000000000000004 |3.8000000000000007 |1.0  |
+---+---------+---------------+----+---------+------------------+-------------------+-------------------

In [10]:
# 4.2 Aplicamos un modelo de regresión
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# Categóricas -> índices -> OHE
cat_cols = ["Origen", "Destino", "Razón"]
indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep") for c in cat_cols]
encoders = [OneHotEncoder(inputCol=f"{c}_idx", outputCol=f"{c}_ohe") for c in cat_cols]

# Numéricas "crudas" + derivadas
num_cols = [
    "Año","PIB_Origen","PIB_Destino",
    "Tasa_Desempleo_Origen","Tasa_Desempleo_Destino",
    "Nivel_Educativo_Origen","Nivel_Educativo_Destino",
    "Población_Origen","Población_Destino",
    "Multiplicador_PIB","Gap_Desempleo","Gap_Nivel_Educativo"
]

# Ensamblar features
assembler = VectorAssembler(
    inputCols=num_cols + [f"{c}_ohe" for c in cat_cols],
    outputCol="features",
    handleInvalid="keep"
)

# Modelo: Regresión Logística binaria
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=50, regParam=0.0, elasticNetParam=0.0)

pipeline = Pipeline(stages=[*indexers, *encoders, assembler, lr])

# Split (ojo: dataset muy pequeño, esto es solo demostrativo)
train, test = df_labeled.randomSplit([0.8, 0.2], seed=42)

model = pipeline.fit(train)
pred_test = model.transform(test)

# Ver predicciones
pred_test.select("ID","Origen","Destino","label","probability","prediction").show(truncate=False)

+---+---------+--------+-----+-----------+----------+
|ID |Origen   |Destino |label|probability|prediction|
+---+---------+--------+-----+-----------+----------+
|3  |Venezuela|Colombia|1.0  |[0.0,1.0]  |1.0       |
+---+---------+--------+-----+-----------+----------+



In [11]:
# 4.3 Evaluamos el modelo
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# AUC (curva ROC) y Accuracy
evaluator_auc = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

auc = evaluator_auc.evaluate(pred_test)
acc = evaluator_acc.evaluate(pred_test)

print(f"AUC en test: {auc:.3f}")
print(f"Accuracy en test: {acc:.3f}")

# Matriz de confusión
cm = (pred_test
      .groupBy("label","prediction")
      .count()
      .orderBy("label","prediction"))
cm.show()

# Coeficientes del modelo (opcional, para interpretar señales)
lr_stage = model.stages[-1]  # última etapa del pipeline
print("Intercepto:", lr_stage.intercept)
print("Nº de coeficientes:", len(lr_stage.coefficients))

AUC en test: 1.000
Accuracy en test: 1.000
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|    1|
+-----+----------+-----+

Intercepto: inf
Nº de coeficientes: 23


In [None]:
# 4.4 Analisis
# Este resultado es "demasiado perfecto" y eso es mala señal:
#  - AUC=1.0 y Accuracy=1.0 con matriz de confusión de 1 solo caso (1→1) indica que tu set de test tiene una sola fila y además solo de la clase 1.
#    Con tan pocos datos, cualquier acierto da 1.0.
#  - Intercepto = inf sugiere clase única en train o separación perfecta (las features dejan la clase 1 completamente separada). Con 5 filas y
#    muchas one-hots, es esperable.
#  - Con este escenario, las métricas no son válidas: no miden capacidad de generalización, solo reflejan que el test tenía un caso positivo y el
#    modelo lo marcó como 1.