<a href="https://colab.research.google.com/github/NicoFuentese/Ciencia_De_Datos_TD2025/blob/main/9.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# === 0) Instalación y arranque de Spark en Colab ===
# (Si ya está instalado, puedes omitir)
!pip -q install pyspark==3.5.1

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MigracionesSpark").getOrCreate()

spark

In [None]:
# === 1) Carga de datos, RDD y DataFrame ===
import os
from pyspark.sql.types import (StructType, StructField, IntegerType, StringType, DoubleType, LongType)

CSV_PATH = "migraciones.csv"  # Subir a Colab o dejar en la raíz si ya está
# Si usas Drive, monta y ajusta CSV_PATH

# Esquema explícito (recomendado para datasets grandes)
schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Origen", StringType(), True),
    StructField("Destino", StringType(), True),
    StructField("Año", IntegerType(), True),
    StructField("Razón", StringType(), True),
    StructField("PIB_Origen", DoubleType(), True),
    StructField("PIB_Destino", DoubleType(), True),
    StructField("Tasa_Desempleo_Origen", DoubleType(), True),
    StructField("Tasa_Desempleo_Destino", DoubleType(), True),
    StructField("Nivel_Educativo_Origen", DoubleType(), True),
    StructField("Nivel_Educativo_Destino", DoubleType(), True),
    StructField("Población_Origen", LongType(), True),
    StructField("Población_Destino", LongType(), True),
])

df = spark.read.csv(CSV_PATH, header=True, schema=schema)

# RDD subyacente
rdd = df.rdd

print("Primeras filas (DataFrame):")
df.show(5, truncate=False)

print("\nEsquema:")
df.printSchema()

print("\nEstadísticas descriptivas (numéricas):")
df.describe().show(truncate=False)

Primeras filas (DataFrame):
+---+---------+---------------+----+---------+----------+-----------+---------------------+----------------------+----------------------+-----------------------+----------------+-----------------+
|ID |Origen   |Destino        |Año |Razón    |PIB_Origen|PIB_Destino|Tasa_Desempleo_Origen|Tasa_Desempleo_Destino|Nivel_Educativo_Origen|Nivel_Educativo_Destino|Población_Origen|Población_Destino|
+---+---------+---------------+----+---------+----------+-----------+---------------------+----------------------+----------------------+-----------------------+----------------+-----------------+
|1  |México   |EEUU           |2015|Económica|8900.0    |62000.0    |5.2                  |3.8                   |8.5                   |12.3                   |125000000       |331000000        |
|2  |Siria    |Alemania       |2016|Conflicto|2500.0    |45000.0    |15.4                 |4.5                   |6.2                   |14.1                   |18000000        |830000

In [None]:
# === 2) Procesamiento con RDDs: transformaciones y acciones ===
# filter: mantener registros del año >= 2015 y <= 2022 (por ejemplo)
rdd_filtrado = rdd.filter(lambda row: row.Año is not None and 2015 <= row.Año <= 2022)

# map: proyectar a (Origen, Destino, Razón)
rdd_mapeado = rdd_filtrado.map(lambda row: (row.Origen, row.Destino, row.Razón))

# flatMap: separar palabras de la razón (para ejemplo)
rdd_razon_palabras = rdd_filtrado.flatMap(lambda row: row.Razón.split())

# Acciones
print("Cantidad total de registros:", rdd.count())
print("Cantidad tras filtro (2015-2022):", rdd_filtrado.count())

print("Ejemplo .take:")
print(rdd_mapeado.take(5))

print("Frecuencia de palabras en Razón (top 10):")
from collections import Counter
cont = Counter(rdd_razon_palabras.collect())
print(cont.most_common(10))

Cantidad total de registros: 5
Cantidad tras filtro (2015-2022): 5
Ejemplo .take:
[('México', 'EEUU', 'Económica'), ('Siria', 'Alemania', 'Conflicto'), ('Venezuela', 'Colombia', 'Política'), ('India', 'Emiratos Árabes', 'Laboral'), ('Argentina', 'España', 'Económica')]
Frecuencia de palabras en Razón (top 10):
[('Económica', 2), ('Conflicto', 1), ('Política', 1), ('Laboral', 1)]


In [None]:
# === DataFrames: filtros, agregaciones, ordenamientos y escritura Parquet ===
from pyspark.sql import functions as F

# Filtro: migraciones económicas recientes
df_econ = df.filter((F.col("Razón") == "Económica") & (F.col("Año") >= 2015))
df_econ.show(5, truncate=False)

# Agregación: top países de origen por conteo
origen_top = (df.groupBy("Origen")
                .agg(F.count("*").alias("Migraciones"))
                .orderBy(F.desc("Migraciones")))
origen_top.show(10, truncate=False)

# Agregación: top destinos por conteo
destino_top = (df.groupBy("Destino")
                 .agg(F.count("*").alias("Migraciones"))
                 .orderBy(F.desc("Migraciones")))
destino_top.show(10, truncate=False)

# Agregación: razones más frecuentes
razon_top = (df.groupBy("Razón")
               .agg(F.count("*").alias("Frecuencia"))
               .orderBy(F.desc("Frecuencia")))
razon_top.show(truncate=False)

# Escritura en Parquet (carpeta de salida)
out_dir = "salidas_parquet"
(origen_top.write.mode("overwrite").parquet(f"{out_dir}/origen_top.parquet"))
(destino_top.write.mode("overwrite").parquet(f"{out_dir}/destino_top.parquet"))
(razon_top.write.mode("overwrite").parquet(f"{out_dir}/razon_top.parquet"))

print("Parquet escrito en:", out_dir)

+---+---------+-------+----+---------+----------+-----------+---------------------+----------------------+----------------------+-----------------------+----------------+-----------------+
|ID |Origen   |Destino|Año |Razón    |PIB_Origen|PIB_Destino|Tasa_Desempleo_Origen|Tasa_Desempleo_Destino|Nivel_Educativo_Origen|Nivel_Educativo_Destino|Población_Origen|Población_Destino|
+---+---------+-------+----+---------+----------+-----------+---------------------+----------------------+----------------------+-----------------------+----------------+-----------------+
|1  |México   |EEUU   |2015|Económica|8900.0    |62000.0    |5.2                  |3.8                   |8.5                   |12.3                   |125000000       |331000000        |
|5  |Argentina|España |2019|Económica|10000.0   |34000.0    |9.5                  |13.2                  |11.5                  |13.8                   |45000000        |47000000         |
+---+---------+-------+----+---------+----------+------

In [None]:
# === 3) Spark SQL: registro de vista y consultas ===
df.createOrReplaceTempView("migraciones")

# Principales países de origen y destino
sql_origen = spark.sql('''
SELECT Origen, COUNT(*) AS Migraciones
FROM migraciones
GROUP BY Origen
ORDER BY Migraciones DESC
LIMIT 10
''')
sql_origen.show(truncate=False)

sql_destino = spark.sql('''
SELECT Destino, COUNT(*) AS Migraciones
FROM migraciones
GROUP BY Destino
ORDER BY Migraciones DESC
LIMIT 10
''')
sql_destino.show(truncate=False)

# Razones de migración por "región": si no hay región, ofrecemos un mapeo simple de ejemplo.
# Puedes editar el diccionario para tu dataset real.
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

region_map = {
    "México": "América Latina", "EEUU": "Norteamérica",
    "Siria": "Asia Occidental", "Alemania": "Europa",
    "Venezuela": "América Latina", "Colombia": "América Latina",
    "India": "Asia Meridional", "Emiratos Árabes": "Asia Occidental",
    "Argentina": "América Latina", "España": "Europa"
}

@udf(StringType())
def region_de(pais):
    if pais is None:
        return None
    return region_map.get(pais, "Otra/No especificada")

# Rename the column in the DataFrame before creating the view
df_reg = df.withColumnRenamed("Razón", "Razon").withColumn("Region_Destino", region_de(F.col("Destino")))
df_reg.createOrReplaceTempView("migraciones_region")

sql_razon_region = spark.sql('''
SELECT Region_Destino, Razon, COUNT(*) AS Frecuencia
FROM migraciones_region
GROUP BY Region_Destino, Razon
ORDER BY Region_Destino, Frecuencia DESC
''')
sql_razon_region.show(truncate=False)

+---------+-----------+
|Origen   |Migraciones|
+---------+-----------+
|Argentina|1          |
|Siria    |1          |
|India    |1          |
|Venezuela|1          |
|México   |1          |
+---------+-----------+

+---------------+-----------+
|Destino        |Migraciones|
+---------------+-----------+
|España         |1          |
|Emiratos Árabes|1          |
|Alemania       |1          |
|EEUU           |1          |
|Colombia       |1          |
+---------------+-----------+

+---------------+---------+----------+
|Region_Destino |Razon    |Frecuencia|
+---------------+---------+----------+
|América Latina |Política |1         |
|Asia Occidental|Laboral  |1         |
|Europa         |Económica|1         |
|Europa         |Conflicto|1         |
|Norteamérica   |Económica|1         |
+---------------+---------+----------+



In [None]:
# === 4) MLlib: Regresión Logística para probabilidad de razón económica ===
# Label binaria: 1 si Razón == 'Económica', 0 si no.
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql import functions as F

df_ml = df.select(
    "Razón", "PIB_Origen", "PIB_Destino", "Tasa_Desempleo_Origen", "Tasa_Desempleo_Destino",
    "Nivel_Educativo_Origen", "Nivel_Educativo_Destino", "Población_Origen", "Población_Destino"
).dropna()

# Create a binary label: 1 if Razón is 'Económica', 0 otherwise
df_ml = df_ml.withColumn("label_bin", F.when(F.col("Razón") == "Económica", 1.0).otherwise(0.0))

features = ["PIB_Origen", "PIB_Destino", "Tasa_Desempleo_Origen", "Tasa_Desempleo_Destino",
            "Nivel_Educativo_Origen", "Nivel_Educativo_Destino", "Población_Origen", "Población_Destino"]
assembler = VectorAssembler(inputCols=features, outputCol="features_raw")
scaler = StandardScaler(inputCol="features_raw", outputCol="features", withStd=True, withMean=True)

lr = LogisticRegression(featuresCol="features", labelCol="label_bin", maxIter=100, regParam=0.01, elasticNetParam=0.0)

# Remove StringIndexer from the pipeline as we created the binary label manually
pipeline = Pipeline(stages=[assembler, scaler, lr])

train, test = df_ml.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train)
pred = model.transform(test)

evaluator_roc = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label_bin", metricName="areaUnderROC")
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label_bin", predictionCol="prediction", metricName="accuracy")

auc = evaluator_roc.evaluate(pred)
acc = evaluator_acc.evaluate(pred)

print(f"AUC: {auc:.3f} | Accuracy: {acc:.3f}")
pred.select("label_bin", "probability", "prediction").show(10, truncate=False)

AUC: 1.000 | Accuracy: 0.000
+---------+---------------------------------------+----------+
|label_bin|probability                            |prediction|
+---------+---------------------------------------+----------+
|1.0      |[0.7842112265359532,0.2157887734640468]|0.0       |
+---------+---------------------------------------+----------+



In [None]:
# === 5) Guardado de artefactos y reflexiones ===
# Guardar vistas de SQL en Parquet también
out_dir_sql = "salidas_sql"
(sql_origen.write.mode("overwrite").parquet(f"{out_dir_sql}/top_origen.parquet"))
(sql_destino.write.mode("overwrite").parquet(f"{out_dir_sql}/top_destino.parquet"))
(sql_razon_region.write.mode("overwrite").parquet(f"{out_dir_sql}/razon_por_region.parquet"))

# Plantilla de reflexiones
reflexiones = """Reflexiones analíticas – Migraciones (borrador)

1) Hallazgos principales:
   - Top orígenes, top destinos, razones predominantes por región/destino.

2) Interpretación socioeconómica:
   - Cómo se relacionan PIB, desempleo y niveles educativos con la razón económica.

3) Modelo MLlib (Regresión Logística):
   - AUC: ____  |  Accuracy: ____
   - Variables que más impactaron (comente según coeficientes del modelo y sentido común).

4) Limitaciones y mejoras:
   - Calidad/ausencia de 'Región', más features (conflictos, políticas migratorias), ventanas temporales, tuning y validación cruzada.
"""

with open("reflexiones_migraciones.txt", "w", encoding="utf-8") as f:
    f.write(reflexiones)

print("Archivos escritos:")
print(" - salidas_parquet/* (agregaciones)")
print(" - salidas_sql/* (consultas SQL)")
print(" - reflexiones_migraciones.txt")

Archivos escritos:
 - salidas_parquet/* (agregaciones)
 - salidas_sql/* (consultas SQL)
 - reflexiones_migraciones.txt
