In [6]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
from pyspark.sql.window import Window

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import os

In [7]:
# ------------------------------
# 0. SparkSession
# ------------------------------
spark = (
    SparkSession.builder
    .appName("EvaluacionFinal-Migraciones")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

# Paths
INPUT_CSV = "migraciones.csv"
OUT_DIR = "out_parquet"           # carpeta de salida para Parquet
os.makedirs(OUT_DIR, exist_ok=True)

In [8]:
# ------------------------------
# 1. CARGA Y EXPLORACIÓN
# ------------------------------
# DataFrame
df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(INPUT_CSV)
)

print("\n=== Primeras filas (DataFrame.show) ===")
df.show(10, truncate=False)

print("\n=== Esquema (printSchema) ===")
df.printSchema()

print("\n=== Estadísticas descriptivas (describe) ===")
df.describe().show(truncate=False)

# RDD
rdd = df.rdd
print("\n=== Primeros 3 elementos del RDD ===")
print(rdd.take(3))


=== Primeras filas (DataFrame.show) ===
+---+---------+---------------+----+---------+----------+-----------+---------------------+----------------------+----------------------+-----------------------+----------------+-----------------+
|ID |Origen   |Destino        |Año |Razón    |PIB_Origen|PIB_Destino|Tasa_Desempleo_Origen|Tasa_Desempleo_Destino|Nivel_Educativo_Origen|Nivel_Educativo_Destino|Población_Origen|Población_Destino|
+---+---------+---------------+----+---------+----------+-----------+---------------------+----------------------+----------------------+-----------------------+----------------+-----------------+
|1  |México   |EEUU           |2015|Económica|8900      |62000      |5.2                  |3.8                   |8.5                   |12.3                   |125000000       |331000000        |
|2  |Siria    |Alemania       |2016|Conflicto|2500      |45000      |15.4                 |4.5                   |6.2                   |14.1                   |18000000  

In [9]:
# ------------------------------
# 2. PROCESAMIENTO con RDDs y DataFrames
# ------------------------------
# 2.1 Transformaciones sobre RDD: filter, map, flatMap
#    - Ej: filtrar registros con PIB_Destino > PIB_Origen
rdd_filtrado = rdd.filter(lambda row: row.PIB_Destino > row.PIB_Origen)

#    - map: quedarnos con tuplas (Origen, Destino, Año)
rdd_mapeado = rdd_filtrado.map(lambda row: (row.Origen, row.Destino, row.Año))

#    - flatMap: separar Razón (aunque sea una sola por fila); ejemplo didáctico
rdd_flat = rdd.flatMap(lambda row: [(row.ID, r) for r in [row.Razón]])

# 2.2 Acciones sobre RDD: collect, take, count
print("\n=== RDD acciones ===")
print("count rdd:", rdd.count())
print("count rdd_filtrado:", rdd_filtrado.count())
print("take(5) rdd_mapeado:", rdd_mapeado.take(5))
print("collect sample rdd_flat (hasta 5):", rdd_flat.take(5))

# 2.3 Operaciones con DataFrames: filtrado, agregaciones, ordenamiento
df_filtrado = df.filter(F.col("PIB_Destino") > F.col("PIB_Origen"))

df_agg = (
    df.groupBy("Origen")
      .agg(
          F.count("*").alias("num_registros"),
          F.avg("PIB_Origen").alias("PIB_Origen_prom"),
          F.avg("PIB_Destino").alias("PIB_Destino_prom")
      )
      .orderBy(F.desc("num_registros"))
)

print("\n=== Agregación por Origen ===")
df_agg.show(truncate=False)

# 2.4 Escribir resultados en Parquet
(
    df_filtrado
    .write
    .mode("overwrite")
    .parquet(os.path.join(OUT_DIR, "df_filtrado.parquet"))
)
(
    df_agg
    .write
    .mode("overwrite")
    .parquet(os.path.join(OUT_DIR, "df_agg_por_origen.parquet"))
)

print(f"\n[OK] Resultados escritos en carpeta Parquet: {OUT_DIR}/")


=== RDD acciones ===
count rdd: 5
count rdd_filtrado: 5
take(5) rdd_mapeado: [('México', 'EEUU', 2015), ('Siria', 'Alemania', 2016), ('Venezuela', 'Colombia', 2017), ('India', 'Emiratos Árabes', 2018), ('Argentina', 'España', 2019)]
collect sample rdd_flat (hasta 5): [(1, 'Económica'), (2, 'Conflicto'), (3, 'Política'), (4, 'Laboral'), (5, 'Económica')]

=== Agregación por Origen ===
+---------+-------------+---------------+----------------+
|Origen   |num_registros|PIB_Origen_prom|PIB_Destino_prom|
+---------+-------------+---------------+----------------+
|Argentina|1            |10000.0        |34000.0         |
|Siria    |1            |2500.0         |45000.0         |
|India    |1            |2200.0         |43000.0         |
|Venezuela|1            |6000.0         |15000.0         |
|México   |1            |8900.0         |62000.0         |
+---------+-------------+---------------+----------------+


[OK] Resultados escritos en carpeta Parquet: out_parquet/


In [10]:
# ------------------------------
# 3. CONSULTAS con Spark SQL
# ------------------------------
df.createOrReplaceTempView("migraciones")

print("\n=== Principales países de origen (Top-N) ===")
spark.sql("""
    SELECT Origen, COUNT(*) AS total
    FROM migraciones
    GROUP BY Origen
    ORDER BY total DESC
""").show(truncate=False)

print("\n=== Principales países de destino (Top-N) ===")
spark.sql("""
    SELECT Destino, COUNT(*) AS total
    FROM migraciones
    GROUP BY Destino
    ORDER BY total DESC
""").show(truncate=False)

# "Razones por región": creamos un mapeo simple país->región (didáctico)
region_map = {
    # América
    "México":"América", "EEUU":"América", "Estados Unidos":"América",
    "Venezuela":"América", "Colombia":"América", "Argentina":"América",
    "España":"Europa", "Alemania":"Europa",
    "India":"Asia", "Emiratos Árabes":"Asia", "Siria":"Asia"
}
map_broadcast = spark.sparkContext.broadcast(region_map)

@F.udf(returnType=StringType())
def pais_a_region(pais):
    if pais is None:
        return "Desconocido"
    return map_broadcast.value.get(pais, "Desconocido")

df_regiones = (
    df.withColumn("Region_Origen", pais_a_region(F.col("Origen")))
      .withColumn("Region_Destino", pais_a_region(F.col("Destino")))
)
df_regiones.createOrReplaceTempView("migraciones_regiones")

print("\n=== Principales razones de migración por región de ORIGEN ===")
spark.sql("""
    SELECT `Region_Origen` AS Region, `Razón`, COUNT(*) AS total
    FROM migraciones_regiones
    GROUP BY `Region_Origen`, `Razón`
    ORDER BY Region, total DESC
""").show(truncate=False)

print("\n=== Principales razones de migración por región de DESTINO ===")
spark.sql("""
    SELECT `Region_Destino` AS Region, `Razón`, COUNT(*) AS total
    FROM migraciones_regiones
    GROUP BY `Region_Destino`, `Razón`
    ORDER BY Region, total DESC
""").show(truncate=False)


=== Principales países de origen (Top-N) ===
+---------+-----+
|Origen   |total|
+---------+-----+
|Argentina|1    |
|Siria    |1    |
|India    |1    |
|Venezuela|1    |
|México   |1    |
+---------+-----+


=== Principales países de destino (Top-N) ===
+---------------+-----+
|Destino        |total|
+---------------+-----+
|España         |1    |
|Emiratos Árabes|1    |
|Alemania       |1    |
|EEUU           |1    |
|Colombia       |1    |
+---------------+-----+


=== Principales razones de migración por región de ORIGEN ===
+-------+---------+-----+
|Region |Razón    |total|
+-------+---------+-----+
|América|Económica|2    |
|América|Política |1    |
|Asia   |Conflicto|1    |
|Asia   |Laboral  |1    |
+-------+---------+-----+


=== Principales razones de migración por región de DESTINO ===
+-------+---------+-----+
|Region |Razón    |total|
+-------+---------+-----+
|América|Política |1    |
|América|Económica|1    |
|Asia   |Laboral  |1    |
|Europa |Económica|1    |
|Europa |

In [11]:
# ------------------------------
# 4. MLlib: Predicción
#    Modelo: Regresión Logística MULTINOMIAL para predecir 'Razón'
# ------------------------------
# Selección de features:
# numéricas (ejemplo): PIB_Origen, PIB_Destino, Tasa_Desempleo_Origen, Tasa_Desempleo_Destino,
#                      Nivel_Educativo_Origen, Nivel_Educativo_Destino,
#                      Población_Origen, Población_Destino, Año
# categóricas: Origen, Destino
numeric_feats = [
    "PIB_Origen", "PIB_Destino",
    "Tasa_Desempleo_Origen", "Tasa_Desempleo_Destino",
    "Nivel_Educativo_Origen", "Nivel_Educativo_Destino",
    "Población_Origen", "Población_Destino",
    "Año"
]
cat_feats = ["Origen", "Destino"]

# Indexación de label (Razón) y categóricas
label_indexer = StringIndexer(inputCol="Razón", outputCol="label", handleInvalid="keep")

indexers = [
    StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep")
    for c in cat_feats
]

encoders = [
    OneHotEncoder(inputCol=f"{c}_idx", outputCol=f"{c}_oh")
    for c in cat_feats
]

assembler = VectorAssembler(
    inputCols=numeric_feats + [f"{c}_oh" for c in cat_feats],
    outputCol="features",
    handleInvalid="keep"
)

# Regresión Logística multinomial
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=200)

pipeline = Pipeline(stages=[label_indexer] + indexers + encoders + [assembler, lr])

# Train/test split
train_df, test_df = df_regiones.randomSplit([0.8, 0.2], seed=42)

model = pipeline.fit(train_df)
pred = model.transform(test_df)

print("\n=== Predicciones MLlib (muestra) ===")
pred.select("Origen","Destino","Razón","label","prediction","probability").show(truncate=False)

# Evaluación
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                                  metricName="accuracy")
evaluator_f1  = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                                  metricName="f1")

acc = evaluator_acc.evaluate(pred)
f1  = evaluator_f1.evaluate(pred)

print("\n=== Métricas del modelo (Regresión Logística multinomial) ===")
print(f"Accuracy: {acc:.4f}")
print(f"F1-score: {f1:.4f}")

# Nota analítica breve:
print("\n--- Reflexión breve ---")
print("* Dado el tamaño pequeño del dataset de ejemplo, las métricas pueden ser inestables.")
print("* El enfoque multinomial permite modelar directamente 'Razón' como multiclase.")
print("* Para mejorar: más datos, enriquecer features (p. ej., diferenciales PIB/Desempleo/Educación),")
print("  e incorporar regularización y validación cruzada con ParamGridBuilder + CrossValidator.")

# Guardar también el DataFrame con regiones a Parquet
(
    df_regiones
    .write
    .mode("overwrite")
    .parquet(os.path.join(OUT_DIR, "migraciones_con_regiones.parquet"))
)

print(f"\n[OK] Parquet adicional escrito en: {OUT_DIR}/migraciones_con_regiones.parquet")

spark.stop()


=== Predicciones MLlib (muestra) ===
+---------+--------+--------+-----+----------+-----------------------------------------------------------------------------------+
|Origen   |Destino |Razón   |label|prediction|probability                                                                        |
+---------+--------+--------+-----+----------+-----------------------------------------------------------------------------------+
|Venezuela|Colombia|Política|3.0  |0.0       |[0.993839593249123,0.0011493354066052879,0.004932851585347325,7.821975892443429E-5]|
+---------+--------+--------+-----+----------+-----------------------------------------------------------------------------------+


=== Métricas del modelo (Regresión Logística multinomial) ===
Accuracy: 0.0000
F1-score: 0.0000

--- Reflexión breve ---
* Dado el tamaño pequeño del dataset de ejemplo, las métricas pueden ser inestables.
* El enfoque multinomial permite modelar directamente 'Razón' como multiclase.
* Para mejorar: más 