EVALUACIÓN FINAL: ANÁLISIS DE MOVIMIENTOS MIGRATORIOS CON SPARK

Eres parte de un equipo de analistas de datos encargado de estudiar las tendencias de migración humana en el siglo XXI utilizando Big Data. Para ello, trabajarás con un conjunto de datos que contiene información sobre migraciones entre distintos países, sus causas y el impacto socioeconómico en las regiones de origen y destino.

Objetivos de la actividad

1. Aplicar conceptos de Big Data utilizando Apache Spark y PySpark.
2. Explorar y transformar datos con RDDs y DataFrames.
3. Realizar consultas con Spark SQL.
4. Implementar modelos de aprendizaje automático con MLlib.

1. Carga y exploración de datos (2 puntos)
- Carga el dataset proporcionado en Spark.
- Convierte los datos en un RDD y un DataFrame.
- Explora los datos: muestra las primeras filas, el esquema y genera estadísticas descriptivas.

In [3]:
%pip install pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("migraciones").getOrCreate()
file_path = "/content/sample_data/migraciones.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)
df.show()
df.printSchema()
df.describe().show()

rdd_migraciones = df.rdd
rdd_migraciones.take(5)

+---+---------+---------------+----+---------+----------+-----------+---------------------+----------------------+----------------------+-----------------------+----------------+-----------------+
| ID|   Origen|        Destino| Año|    Razón|PIB_Origen|PIB_Destino|Tasa_Desempleo_Origen|Tasa_Desempleo_Destino|Nivel_Educativo_Origen|Nivel_Educativo_Destino|Población_Origen|Población_Destino|
+---+---------+---------------+----+---------+----------+-----------+---------------------+----------------------+----------------------+-----------------------+----------------+-----------------+
|  1|   México|           EEUU|2015|Económica|      8900|      62000|                  5.2|                   3.8|                   8.5|                   12.3|       125000000|        331000000|
|  2|    Siria|       Alemania|2016|Conflicto|      2500|      45000|                 15.4|                   4.5|                   6.2|                   14.1|        18000000|         83000000|
|  3|Venezuela|

[Row(ID=1, Origen='México', Destino='EEUU', Año=2015, Razón='Económica', PIB_Origen=8900, PIB_Destino=62000, Tasa_Desempleo_Origen=5.2, Tasa_Desempleo_Destino=3.8, Nivel_Educativo_Origen=8.5, Nivel_Educativo_Destino=12.3, Población_Origen=125000000, Población_Destino=331000000),
 Row(ID=2, Origen='Siria', Destino='Alemania', Año=2016, Razón='Conflicto', PIB_Origen=2500, PIB_Destino=45000, Tasa_Desempleo_Origen=15.4, Tasa_Desempleo_Destino=4.5, Nivel_Educativo_Origen=6.2, Nivel_Educativo_Destino=14.1, Población_Origen=18000000, Población_Destino=83000000),
 Row(ID=3, Origen='Venezuela', Destino='Colombia', Año=2017, Razón='Política', PIB_Origen=6000, PIB_Destino=15000, Tasa_Desempleo_Origen=14.8, Tasa_Desempleo_Destino=10.1, Nivel_Educativo_Origen=9.3, Nivel_Educativo_Destino=11.0, Población_Origen=28000000, Población_Destino=50000000),
 Row(ID=4, Origen='India', Destino='Emiratos Árabes', Año=2018, Razón='Laboral', PIB_Origen=2200, PIB_Destino=43000, Tasa_Desempleo_Origen=7.2, Tasa_Des

2. Procesamiento de datos con RDDs y DataFrames (3 puntos)
- Aplica transformaciones sobre los RDDs (filter, map, flatMap).
- Aplica acciones sobre los RDDs (collect, take, count).
- Realiza operaciones con DataFrames: filtrado, agregaciones y ordenamiento.
- Escribe los resultados en formato Parquet.

In [17]:
rdd_filter = rdd_migraciones.filter(lambda x: x["PIB_Destino"] > 40000)
print(rdd_filter.take(5))

rdd_map = rdd_migraciones.map(lambda x: (x["Origen"], x["Destino"], x["Año"]))
print(rdd_map.take(5))

rdd_flatMap = rdd_migraciones.flatMap(lambda x: [(x["Origen"], x["Destino"], x["Año"]), (x["Destino"], x["Origen"], x["Año"])])
print(rdd_flatMap.take(5))

print(
  rdd_migraciones.collect(),
  rdd_migraciones.take(5),
  rdd_migraciones.count()
)

df_filter = df.filter(df["Tasa_Desempleo_Origen"] > 10)
df_filter.show()

df_agg = df.groupBy("Razón").count()
df_agg.show()

df_sort = df.orderBy("PIB_Origen")
df_sort.show()

df.write.parquet("migraciones.parquet")

[Row(ID=1, Origen='México', Destino='EEUU', Año=2015, Razón='Económica', PIB_Origen=8900, PIB_Destino=62000, Tasa_Desempleo_Origen=5.2, Tasa_Desempleo_Destino=3.8, Nivel_Educativo_Origen=8.5, Nivel_Educativo_Destino=12.3, Población_Origen=125000000, Población_Destino=331000000), Row(ID=2, Origen='Siria', Destino='Alemania', Año=2016, Razón='Conflicto', PIB_Origen=2500, PIB_Destino=45000, Tasa_Desempleo_Origen=15.4, Tasa_Desempleo_Destino=4.5, Nivel_Educativo_Origen=6.2, Nivel_Educativo_Destino=14.1, Población_Origen=18000000, Población_Destino=83000000), Row(ID=4, Origen='India', Destino='Emiratos Árabes', Año=2018, Razón='Laboral', PIB_Origen=2200, PIB_Destino=43000, Tasa_Desempleo_Origen=7.2, Tasa_Desempleo_Destino=1.8, Nivel_Educativo_Origen=7.8, Nivel_Educativo_Destino=13.0, Población_Origen=1380000000, Población_Destino=9800000)]
[('México', 'EEUU', 2015), ('Siria', 'Alemania', 2016), ('Venezuela', 'Colombia', 2017), ('India', 'Emiratos Árabes', 2018), ('Argentina', 'España', 2019

3. Consultas con Spark SQL (2 puntos)
- Registra el DataFrame como una tabla temporal.
- Realiza consultas sobre los principales países de origen y destino.
- Analiza las principales razones de migración por región.

In [28]:
df.createOrReplaceTempView("migraciones_tabla")
ori_des = spark.sql("SELECT Origen, Destino FROM migraciones_tabla")
ori_des.show()

razon_reg = spark.sql("SELECT `Razón`, Origen, Destino FROM migraciones_tabla")
razon_reg.show()

+---------+---------------+
|   Origen|        Destino|
+---------+---------------+
|   México|           EEUU|
|    Siria|       Alemania|
|Venezuela|       Colombia|
|    India|Emiratos Árabes|
|Argentina|         España|
+---------+---------------+

+---------+---------+---------------+
|    Razón|   Origen|        Destino|
+---------+---------+---------------+
|Económica|   México|           EEUU|
|Conflicto|    Siria|       Alemania|
| Política|Venezuela|       Colombia|
|  Laboral|    India|Emiratos Árabes|
|Económica|Argentina|         España|
+---------+---------+---------------+



4. Aplicación de MLlib para predicción de flujos migratorios (3 puntos)
- Convierte los datos en un formato adecuado para MLlib.
- Aplica un modelo de regresión logística para predecir la probabilidad de migración basada en factores socioeconómicos.
- Evalúa el modelo y analiza su precisión.

In [46]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

assembler = VectorAssembler(inputCols=["PIB_Origen", "PIB_Destino", "Tasa_Desempleo_Origen", "Tasa_Desempleo_Destino", "Nivel_Educativo_Origen", "Nivel_Educativo_Destino", "Población_Origen", "Población_Destino"], outputCol="features")
df2 = assembler.transform(df)

train_data, test_data = df2.randomSplit([0.6, 0.4], seed=42)

lr = LogisticRegression(labelCol="ID", featuresCol="features")

model = lr.fit(train_data)

predictions = model.transform(test_data)

predictions.select("ID", "probability", "prediction").show()

evaluator = MulticlassClassificationEvaluator(labelCol="ID", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

+---+--------------------+----------+
| ID|         probability|prediction|
+---+--------------------+----------+
|  1|[2.74480218392475...|       2.0|
|  3|[1.46866172141575...|       4.0|
|  5|[4.08616647707328...|       2.0|
+---+--------------------+----------+

Accuracy: 0.0


El modelo no logra presición, debido tal vez a que se ingresa una cantidad muy baja de filas para evaluar al modelo.