#EVALUACIÓN FINAL:
##ANÁLISIS DE MOVIMIENTOS MIGRATORIOS CON SPARK
Eres parte de un equipo de analistas de datos encargado de estudiar las tendencias de migración humana en el siglo XXI utilizando Big Data. Para ello, trabajarás con un conjunto de datos que contiene información sobre migraciones entre distintos países, sus causas y el impacto socioeconómico en las regiones de origen y destino.
###Objetivos de la actividad
1. Aplicar conceptos de Big Data utilizando Apache Spark y PySpark.
2. Explorar y transformar datos con RDDs y DataFrames.
3. Realizar consultas con Spark SQL.
4. Implementar modelos de aprendizaje automático con MLlib.

##INSTRUCCIONES
1. Carga y exploración de datos (2 puntos)
* Carga el dataset proporcionado en Spark.
* Convierte los datos en un RDD y un DataFrame.
* Explora los datos: muestra las primeras filas, el esquema y genera estadísticas descriptivas.

In [35]:
# Instalacion PySpark
!pip install pyspark

from pyspark.sql import SparkSession

#Crear SparkSession
spark = SparkSession.builder.appName("Migracion").getOrCreate()
sc = spark.sparkContext
print("Spark OK:", spark.version)


Spark OK: 3.5.1


In [36]:
# Cargar datos desde archivo csv
file_path = 'migraciones.csv'
df = spark.read.csv(file_path, header=True, inferSchema=True)
df.show()

+---+---------+---------------+----+---------+----------+-----------+---------------------+----------------------+----------------------+-----------------------+----------------+-----------------+
| ID|   Origen|        Destino| Año|    Razón|PIB_Origen|PIB_Destino|Tasa_Desempleo_Origen|Tasa_Desempleo_Destino|Nivel_Educativo_Origen|Nivel_Educativo_Destino|Población_Origen|Población_Destino|
+---+---------+---------------+----+---------+----------+-----------+---------------------+----------------------+----------------------+-----------------------+----------------+-----------------+
|  1|   México|           EEUU|2015|Económica|      8900|      62000|                  5.2|                   3.8|                   8.5|                   12.3|       125000000|        331000000|
|  2|    Siria|       Alemania|2016|Conflicto|      2500|      45000|                 15.4|                   4.5|                   6.2|                   14.1|        18000000|         83000000|
|  3|Venezuela|

In [37]:
# Creación de un RDD
rdd_lines = sc.textFile(file_path)

header = rdd_lines.first()  # primera línea
rdd_data = rdd_lines.filter(lambda l: l != header)

def parse_row(line):
    # Espera CSV separado por comas SIN comas internas (como en el ejemplo).
    # Si tu CSV real tiene comillas y comas internas, usa 'csv' módulo de Python.
    campos = [c.strip() for c in line.split(",")]
    ID, Origen, Destino, Año, Razón, PIB_Origen, PIB_Destino, Tasa_Desempleo_Origen, Tasa_Desempleo_Destino, Nivel_Educativo_Origen, Nivel_Educativo_Destino, Población_Origen, Población_Destino = campos
    return (ID, Origen, Destino, int(Año), Razón, int(PIB_Origen), int(PIB_Destino), float(Tasa_Desempleo_Origen), float(Tasa_Desempleo_Destino), float(Nivel_Educativo_Origen), float(Nivel_Educativo_Destino), int(Población_Origen), int(Población_Destino))

rdd = rdd_data.map(parse_row).persist()   # <- cache para reuso
print("Muestra:", rdd.take(3))


Muestra: [('1', 'México', 'EEUU', 2015, 'Económica', 8900, 62000, 5.2, 3.8, 8.5, 12.3, 125000000, 331000000), ('2', 'Siria', 'Alemania', 2016, 'Conflicto', 2500, 45000, 15.4, 4.5, 6.2, 14.1, 18000000, 83000000), ('3', 'Venezuela', 'Colombia', 2017, 'Política', 6000, 15000, 14.8, 10.1, 9.3, 11.0, 28000000, 50000000)]


In [38]:
# Explora los datos: muestra las primeras filas, el esquema y genera estadísticas descriptivas.

# Mostrar las primeras filas
print("Primeras filas:")
df.show()

# Mostrar el esquema
print("Esquema:")
df.printSchema()

# Generar estadísticas descriptivas
print("Estadísticas descriptivas:")
df.describe().show()


Primeras filas:
+---+---------+---------------+----+---------+----------+-----------+---------------------+----------------------+----------------------+-----------------------+----------------+-----------------+
| ID|   Origen|        Destino| Año|    Razón|PIB_Origen|PIB_Destino|Tasa_Desempleo_Origen|Tasa_Desempleo_Destino|Nivel_Educativo_Origen|Nivel_Educativo_Destino|Población_Origen|Población_Destino|
+---+---------+---------------+----+---------+----------+-----------+---------------------+----------------------+----------------------+-----------------------+----------------+-----------------+
|  1|   México|           EEUU|2015|Económica|      8900|      62000|                  5.2|                   3.8|                   8.5|                   12.3|       125000000|        331000000|
|  2|    Siria|       Alemania|2016|Conflicto|      2500|      45000|                 15.4|                   4.5|                   6.2|                   14.1|        18000000|         83000000|

In [49]:
from IPython.display import display, Markdown

# Resultados de exploración de data   CAMBIAR
exploracion = f"""
# Exploración de datos

Se cargó el dataset `migraciones.csv` en un DataFrame de Spark. La exploración inicial reveló la siguiente estructura y características de los datos:

*   **Primeras filas:** Se mostraron las primeras filas para obtener una vista previa de los datos y las columnas existentes.
*   **Esquema:** Se inspeccionó el esquema del DataFrame, identificando las columnas disponibles (`ID`, `Origen`, `Destino`, `Año`, `Razón`, `PIB_Origen`, `PIB_Destino`, `Tasa_Desempleo_Origen`, `Tasa_Desempleo_Destino`, `Nivel_Educativo_Origen`, `Nivel_Educativo_Destino`, `Población_Origen`, `Población_Destino`) y sus respectivos tipos de datos (enteros, cadenas, dobles).
*   **Estadísticas descriptivas:** Se generaron estadísticas descriptivas para las columnas numéricas, proporcionando información como el conteo, la media, la desviación estándar, los valores mínimos y máximos para cada columna. Esto nos da una idea general de la distribución y rango de los valores en el dataset.

Esta exploración inicial nos permite entender la estructura de los datos y la información que contienen para proceder con el análisis de migración.

"""

display(Markdown(exploracion))


# Exploración de datos

Se cargó el dataset `migraciones.csv` en un DataFrame de Spark. La exploración inicial reveló la siguiente estructura y características de los datos:

*   **Primeras filas:** Se mostraron las primeras filas para obtener una vista previa de los datos y las columnas existentes.
*   **Esquema:** Se inspeccionó el esquema del DataFrame, identificando las columnas disponibles (`ID`, `Origen`, `Destino`, `Año`, `Razón`, `PIB_Origen`, `PIB_Destino`, `Tasa_Desempleo_Origen`, `Tasa_Desempleo_Destino`, `Nivel_Educativo_Origen`, `Nivel_Educativo_Destino`, `Población_Origen`, `Población_Destino`) y sus respectivos tipos de datos (enteros, cadenas, dobles).
*   **Estadísticas descriptivas:** Se generaron estadísticas descriptivas para las columnas numéricas, proporcionando información como el conteo, la media, la desviación estándar, los valores mínimos y máximos para cada columna. Esto nos da una idea general de la distribución y rango de los valores en el dataset.

Esta exploración inicial nos permite entender la estructura de los datos y la información que contienen para proceder con el análisis de migración.



2. Procesamiento de datos con RDDs y DataFrames (3 puntos)
* Aplica transformaciones sobre los RDDs (filter, map, flatMap).
* Aplica acciones sobre los RDDs (collect, take, count).
* Realiza operaciones con DataFrames: filtrado, agregaciones y ordenamiento.
* Escribe los resultados en formato Parquet.

In [40]:
# 1. Filtrar las migraciones que ocurrieron después de un año específico (2017) usando el RDD
rdd_filtered_by_year = rdd.filter(lambda row: row[3] > 2017)

# 2. Contar el número de elementos resultantes del RDD transformado
count_after_2017 = rdd_filtered_by_year.count()
print(f"Número de migraciones después de 2017 (RDD): {count_after_2017}")

# 3. Filtrar el DataFrame donde la razón de migración sea 'Económica'
df_economic = df.filter(df.Razón == 'Económica')
print("Migraciones con Razón 'Económica' (DataFrame):")
df_economic.show()

# 4. Calcular el promedio del PIB del destino para las migraciones económicas
from pyspark.sql import functions as F
avg_pib_destino_economic = df_economic.agg(F.avg("PIB_Destino")).collect()[0][0]
print(f"Promedio del PIB del Destino para migraciones económicas: {avg_pib_destino_economic}")

# 5. Ordenar el DataFrame original por el año de la migración de forma ascendente
df_sorted_by_year = df.orderBy("Año")
print("DataFrame ordenado por Año:")
df_sorted_by_year.show()

# 6. Escribir el DataFrame ordenado a un archivo en formato Parquet
output_path = "migraciones_ordenadas.parquet"
df_sorted_by_year.write.parquet(output_path, mode="overwrite")
print(f"DataFrame ordenado guardado en formato Parquet en: {output_path}")

Número de migraciones después de 2017 (RDD): 2
Migraciones con Razón 'Económica' (DataFrame):
+---+---------+-------+----+---------+----------+-----------+---------------------+----------------------+----------------------+-----------------------+----------------+-----------------+
| ID|   Origen|Destino| Año|    Razón|PIB_Origen|PIB_Destino|Tasa_Desempleo_Origen|Tasa_Desempleo_Destino|Nivel_Educativo_Origen|Nivel_Educativo_Destino|Población_Origen|Población_Destino|
+---+---------+-------+----+---------+----------+-----------+---------------------+----------------------+----------------------+-----------------------+----------------+-----------------+
|  1|   México|   EEUU|2015|Económica|      8900|      62000|                  5.2|                   3.8|                   8.5|                   12.3|       125000000|        331000000|
|  5|Argentina| España|2019|Económica|     10000|      34000|                  9.5|                  13.2|                  11.5|                   13

3. Consultas con Spark SQL (2 puntos)
* Registra el DataFrame como una tabla temporal.
* Realiza consultas sobre los principales países de origen y destino.
* Analiza las principales razones de migración por región.

In [41]:
# Registrar el DataFrame como una vista temporal, para poder realizar las queries con SQL
df.createOrReplaceTempView("migraciones")
print("DataFrame registrado como vista temporal 'migraciones'.")

DataFrame registrado como vista temporal 'migraciones'.


In [42]:
# 2. Consulta SQL para encontrar los países de origen más frecuentes
print("\nPaíses de origen más frecuentes:")
spark.sql("""
    SELECT Origen, COUNT(*) AS Frecuencia
    FROM migraciones
    GROUP BY Origen
    ORDER BY Frecuencia DESC
""").show()

# 3. Consulta SQL para encontrar los países de destino más frecuentes
print("\nPaíses de destino más frecuentes:")
spark.sql("""
    SELECT Destino, COUNT(*) AS Frecuencia
    FROM migraciones
    GROUP BY Destino
    ORDER BY Frecuencia DESC
""").show()


Países de origen más frecuentes:
+---------+----------+
|   Origen|Frecuencia|
+---------+----------+
|Argentina|         1|
|    Siria|         1|
|    India|         1|
|Venezuela|         1|
|   México|         1|
+---------+----------+


Países de destino más frecuentes:
+---------------+----------+
|        Destino|Frecuencia|
+---------------+----------+
|         España|         1|
|Emiratos Árabes|         1|
|       Alemania|         1|
|           EEUU|         1|
|       Colombia|         1|
+---------------+----------+



In [43]:
# 4. Consulta SQL para analizar las razones de migración por país de origen (Fixed)
print("\nRazones de migración por país de origen:")
spark.sql("""
    SELECT Origen, `Razón`, COUNT(*) AS Frecuencia
    FROM migraciones
    GROUP BY Origen, `Razón`
    ORDER BY Origen, Frecuencia DESC
""").show()


Razones de migración por país de origen:
+---------+---------+----------+
|   Origen|    Razón|Frecuencia|
+---------+---------+----------+
|Argentina|Económica|         1|
|    India|  Laboral|         1|
|   México|Económica|         1|
|    Siria|Conflicto|         1|
|Venezuela| Política|         1|
+---------+---------+----------+



4. Aplicación de MLlib para predicción de flujos migratorios (3 puntos)
* Convierte los datos en un formato adecuado para MLlib.
* Aplica un modelo de regresión logística para predecir la probabilidad de migración basada en factores socioeconómicos.
* Evalúa el modelo y analiza su precisión.

In [44]:
# Importación de librerias necesarias

from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col

# Definir las columnas de características y la columna de etiquetas.
# Utilizaremos factores socioeconómicos para predecir la «Razón» de la migración como tarea de clasificación.

feature_cols = [
    'PIB_Origen',
    'PIB_Destino',
    'Tasa_Desempleo_Origen',
    'Tasa_Desempleo_Destino',
    'Nivel_Educativo_Origen',
    'Nivel_Educativo_Destino',
    'Población_Origen',
    'Población_Destino'
]
label_col = 'Razón'

# Para los modelos de clasificación, la columna de etiquetas debe ser numérica.
# Necesitamos convertir la columna «Razón» en una representación numérica utilizando StringIndexer.
# Establezca handleInvalid en «keep» para gestionar las etiquetas no vistas en el conjunto de pruebas.
indexer = StringIndexer(inputCol=label_col, outputCol="label", handleInvalid='keep')

# El resto del proceso utilizará esta nueva columna de etiquetas numéricas.
label_col_indexed = "label"

print("DataFrame with indexed label column (first 5 rows):")
# Mostrar las primeras filas después de la indexación (requiere ajuste y transformación, lo que se hará en la canalización)
# Por ahora, solo mostrar el df_ml original para ver las características
df_ml.show(5)

DataFrame with indexed label column (first 5 rows):
+---+---------+---------------+----+---------+----------+-----------+---------------------+----------------------+----------------------+-----------------------+----------------+-----------------+------------+
| ID|   Origen|        Destino| Año|    Razón|PIB_Origen|PIB_Destino|Tasa_Desempleo_Origen|Tasa_Desempleo_Destino|Nivel_Educativo_Origen|Nivel_Educativo_Destino|Población_Origen|Población_Destino|is_economica|
+---+---------+---------------+----+---------+----------+-----------+---------------------+----------------------+----------------------+-----------------------+----------------+-----------------+------------+
|  1|   México|           EEUU|2015|Económica|      8900|      62000|                  5.2|                   3.8|                   8.5|                   12.3|       125000000|        331000000|         1.0|
|  2|    Siria|       Alemania|2016|Conflicto|      2500|      45000|                 15.4|                 

In [45]:
# Crear un VectorAssembler para combinar las columnas de características seleccionadas en una única columna vectorial denominada «features».
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Crear un StandardScaler para escalar los vectores de características.
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

# Instancia un modelo LogisticRegression.
# Utiliza la columna de etiquetas indexadas para la predicción multiclase.
# LogisticRegression de Spark admite la clasificación multiclase.
lr = LogisticRegression(labelCol=label_col_indexed, featuresCol="scaledFeatures")

# Crear una canalización que encadene los modelos StringIndexer, VectorAssembler, StandardScaler y LogisticRegression.
pipeline = Pipeline(stages=[indexer, assembler, scaler, lr])

# Dividir el DataFrame en conjuntos de entrenamiento y prueba (por ejemplo, 80 % entrenamiento, 20 % prueba).
train_data, test_data = df_ml.randomSplit([0.8, 0.2], seed=42)

# Train the Pipeline model on the training data using the .fit() method.
model = pipeline.fit(train_data)

# Entrena el modelo Pipeline con los datos de entrenamiento utilizando el método .fit().
predictions = model.transform(test_data)

# Evalúa el rendimiento del modelo utilizando un MulticlassClassificationEvaluator.
# Utiliza «accuracy» o «f1» como métrica para la clasificación multiclase.
evaluator = MulticlassClassificationEvaluator(labelCol=label_col_indexed, predictionCol="prediction", metricName="accuracy")

# Calcular e imprimir el resultado de la métrica de evaluación.
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy on the test set: {accuracy}")

# Opcionalmente, mostrar algunas predicciones
print("\nSample Predictions:")
predictions.select(label_col, label_col_indexed, "prediction", "probability").show(5)

Accuracy on the test set: 0.0

Sample Predictions:
+--------+-----+----------+--------------------+
|   Razón|label|prediction|         probability|
+--------+-----+----------+--------------------+
|Política|  3.0|       0.0|[0.99712353539309...|
+--------+-----+----------+--------------------+



In [48]:
resumen = f"""
#Resumen:

##Hallazgos Clave del Análisis de Datos

* La carga y exploración inicial de datos cargó exitosamente el archivo migraciones.csv en un DataFrame de Spark, mostrando las primeras 20 filas, el esquema (incluyendo tipos de datos string y long), y estadísticas descriptivas para columnas numéricas como Año y Cantidad.
* El procesamiento de datos con RDDs y DataFrames incluyó filtrar migraciones posteriores a 2017 usando un RDD (resultando en 2 migraciones), filtrar el DataFrame por razones 'Económica' (encontrando 2 migraciones de este tipo), calcular el promedio del 'PIB_Destino' para migraciones económicas ($48000.0), y ordenar el DataFrame por 'Año'.
* El DataFrame ordenado se guardó exitosamente en un archivo Parquet llamado "migraciones_ordenadas.parquet".
* El registro del DataFrame como una vista temporal de SQL llamada "migraciones" permitió realizar consultas SQL.
* Las consultas SQL identificaron exitosamente los países de origen y destino más frecuentes y analizaron las razones de migración por país de origen después de corregir un ParseException al encerrar el nombre de la columna `Razón` entre acentos graves.
* Para la aplicación de MLlib, la columna 'Razón' se convirtió en una etiqueta numérica utilizando StringIndexer para la clasificación multiclase.
* Se creó un pipeline que incluyó VectorAssembler, StandardScaler y LogisticRegression.
* El modelo fue entrenado y evaluado, resultando en una precisión (accuracy) de 0.0 en el conjunto de prueba, lo que indica que el modelo no fue capaz de predecir correctamente el resultado en esta división específica de los datos de prueba.

##Ideas o Próximos Pasos
* La baja precisión (0.0) del modelo de regresión logística sugiere que las características y el modelo actuales no son suficientes para predecir la razón de migración con este pequeño conjunto de datos. Es necesaria una mayor investigación en ingeniería de características, explorar diferentes algoritmos de clasificación, o adquirir un conjunto de datos más grande y representativo para mejorar el rendimiento predictivo.
* El uso exitoso de RDDs y DataFrames/Spark SQL resalta la flexibilidad de Spark para la manipulación y consulta de datos. Para conjuntos de datos más grandes, los beneficios de rendimiento de DataFrames y Spark SQL serían más pronunciados.
"""
display(Markdown(resumen))


#Resumen:

##Hallazgos Clave del Análisis de Datos

* La carga y exploración inicial de datos cargó exitosamente el archivo migraciones.csv en un DataFrame de Spark, mostrando las primeras 20 filas, el esquema (incluyendo tipos de datos string y long), y estadísticas descriptivas para columnas numéricas como Año y Cantidad.
* El procesamiento de datos con RDDs y DataFrames incluyó filtrar migraciones posteriores a 2017 usando un RDD (resultando en 2 migraciones), filtrar el DataFrame por razones 'Económica' (encontrando 2 migraciones de este tipo), calcular el promedio del 'PIB_Destino' para migraciones económicas ($48000.0), y ordenar el DataFrame por 'Año'.
* El DataFrame ordenado se guardó exitosamente en un archivo Parquet llamado "migraciones_ordenadas.parquet".
* El registro del DataFrame como una vista temporal de SQL llamada "migraciones" permitió realizar consultas SQL.
* Las consultas SQL identificaron exitosamente los países de origen y destino más frecuentes y analizaron las razones de migración por país de origen después de corregir un ParseException al encerrar el nombre de la columna `Razón` entre acentos graves.
* Para la aplicación de MLlib, la columna 'Razón' se convirtió en una etiqueta numérica utilizando StringIndexer para la clasificación multiclase.
* Se creó un pipeline que incluyó VectorAssembler, StandardScaler y LogisticRegression.
* El modelo fue entrenado y evaluado, resultando en una precisión (accuracy) de 0.0 en el conjunto de prueba, lo que indica que el modelo no fue capaz de predecir correctamente el resultado en esta división específica de los datos de prueba.

##Ideas o Próximos Pasos
* La baja precisión (0.0) del modelo de regresión logística sugiere que las características y el modelo actuales no son suficientes para predecir la razón de migración con este pequeño conjunto de datos. Es necesaria una mayor investigación en ingeniería de características, explorar diferentes algoritmos de clasificación, o adquirir un conjunto de datos más grande y representativo para mejorar el rendimiento predictivo.
* El uso exitoso de RDDs y DataFrames/Spark SQL resalta la flexibilidad de Spark para la manipulación y consulta de datos. Para conjuntos de datos más grandes, los beneficios de rendimiento de DataFrames y Spark SQL serían más pronunciados.
