<a href="https://colab.research.google.com/github/Claudia060392/Portafolio-Ciencia-de-Datos/blob/main/Practicas%20Consolidadas/Modulo_9.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
from google.colab import files
import pandas as pd

# Carga del archivo manualmente
archivo = files.upload()

# Lectura del CSV
df = pd.read_csv("migraciones.csv")

# Primeras filas
print(df.head())

Saving migraciones.csv to migraciones.csv
   ID     Origen          Destino   Año      Razón  PIB_Origen  PIB_Destino  \
0   1     México             EEUU  2015  Económica        8900        62000   
1   2      Siria         Alemania  2016  Conflicto        2500        45000   
2   3  Venezuela         Colombia  2017   Política        6000        15000   
3   4      India  Emiratos Árabes  2018    Laboral        2200        43000   
4   5  Argentina           España  2019  Económica       10000        34000   

   Tasa_Desempleo_Origen  Tasa_Desempleo_Destino  Nivel_Educativo_Origen  \
0                    5.2                     3.8                     8.5   
1                   15.4                     4.5                     6.2   
2                   14.8                    10.1                     9.3   
3                    7.2                     1.8                     7.8   
4                    9.5                    13.2                    11.5   

   Nivel_Educativo_Destino

In [4]:
# ================================================================
# EVALUACIÓN FINAL – MÓDULO 9
# Análisis de movimientos migratorios con Spark
# ================================================================

# -----------------------------
# 0) Instalación y sesión Spark
# -----------------------------
# Se instala findspark
!pip -q install findspark

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc, count, avg

from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Se crea la sesión de Spark
spark = SparkSession.builder.appName("MigracionesSpark").getOrCreate()

# -----------------------------
# 1) Carga y exploración de datos
# -----------------------------

df = spark.read.csv("migraciones.csv", header=True, inferSchema=True)

print("Esquema del DataFrame:")
df.printSchema()

print("Primeras filas:")
df.show(5, truncate=False)

print("Cantidad de filas (DF):", df.count())
print("Columnas detectadas:", df.columns)

print("Estadísticas descriptivas (numéricas):")
df.describe().show()

# Se obtiene un RDD para ejercicios con transformaciones/acciones
rdd = df.rdd
print("Cantidad de registros en RDD:", rdd.count())

# -----------------------------
# 2) Procesamiento con RDDs y DataFrames (versión robusta)
# -----------------------------
# Se imprime vista de columnas reales y un pequeño sample de RDD (seguro)
def rdd_preview(rdd_obj, n=3):
    # Esta función intenta mostrar una muestra del RDD de forma segura.
    try:
        print("Preview RDD:", rdd_obj.take(n))
    except Exception as e:
        print("No fue posible hacer take() del RDD:", e)

rdd_preview(rdd, 3)

# Si existiera una columna 'Impacto' (o similar), se filtra; si no, se muestra un aviso y se continúa.
lower_cols = [c.lower() for c in df.columns]
if "impacto" in lower_cols:
    impacto_col = df.columns[lower_cols.index("impacto")]
    rdd_filtrado = rdd.filter(lambda row: getattr(row, impacto_col) == "Positivo")
    print(f"Ejemplo RDD filtrado por {impacto_col} == 'Positivo':")
    rdd_preview(rdd_filtrado, 3)
else:
    print("No existe la columna 'Impacto'; se omite el ejemplo de filtro por impacto y se continúa.")

# -----------------------------
# 3) Agregaciones DataFrame y exportación a Parquet
# -----------------------------
# Se intentan detectar columnas de origen/destino por nombre (flexible).
col_origen = next((c for c in df.columns if ("Origen" in c) or ("origin" in c.lower())), None)
col_dest  = next((c for c in df.columns if ("Destino" in c) or ("destin" in c.lower())), None)

if col_origen:
    print(f"Top 5 de {col_origen} por conteo:")
    df_origen = df.groupBy(col_origen).count().orderBy(desc("count"))
    df_origen.show(5)
    # Se guarda resultado en Parquet (reemplaza si existe)
    df_origen.write.mode("overwrite").parquet("salida_origen.parquet")
else:
    print("No se encontró columna de Origen; se omite este bloque.")

if col_dest:
    print(f"Top 5 de {col_dest} por conteo:")
    df_destino = df.groupBy(col_dest).count().orderBy(desc("count"))
    df_destino.show(5)
else:
    print("No se encontró columna de Destino; se omite este bloque.")

# -----------------------------
# 4) Consultas con Spark SQL
# -----------------------------
df.createOrReplaceTempView("migraciones")

if col_origen:
    print("Consulta SQL – Top 5 origen:")
    spark.sql(f"""
        SELECT `{col_origen}` AS Origen, COUNT(*) AS total
        FROM migraciones
        GROUP BY `{col_origen}`
        ORDER BY total DESC
        LIMIT 5
    """).show()

if col_dest:
    print("Consulta SQL – Top 5 destino:")
    spark.sql(f"""
        SELECT `{col_dest}` AS Destino, COUNT(*) AS total
        FROM migraciones
        GROUP BY `{col_dest}`
        ORDER BY total DESC
        LIMIT 5
    """).show()

# Se busca una columna de razón para ejemplo por región (si existieran).
col_region = next((c for c in df.columns if ("Region" in c) or ("región" in c) or ("region" in c.lower())), None)
col_razon  = next((c for c in df.columns if ("Razon" in c) or ("Razón" in c) or ("reason" in c.lower())), None)

if col_region and col_razon:
    print("Consulta SQL – Principales razones por región:")
    spark.sql(f"""
        SELECT `{col_region}` AS Region, `{col_razon}` AS Razon, COUNT(*) AS total
        FROM migraciones
        GROUP BY `{col_region}`, `{col_razon}`
        ORDER BY total DESC
    """).show(10)
else:
    print("No se encontraron columnas de Región y/o Razón; se omite esta consulta.")

# -----------------------------
# 5) MLlib – Clasificación binaria (si el dataset lo permite)
# -----------------------------
# Se intenta detectar una columna de etiqueta binaria (1/0) para 'Migracion'.
# Lista de candidatos (ajustar si tu CSV usa otro nombre):
candidatos_label = ["Migracion", "Migración", "Es_Migracion", "EsMigracion", "Es_Migrante", "Migrante", "label"]
label_col = next((c for c in df.columns if c in candidatos_label), None)

if label_col is None:
    print("\n[MLlib] No se encontró una columna binaria de etiqueta (p.ej., 'Migracion').")
    print("[MLlib] Para habilitar el modelo, agrega una columna 0/1 o ajusta 'candidatos_label'.")
else:
    # Se seleccionan predictores típicos (socioeconómicos) y se indexan categóricas comunes.
    # Se detectan columnas potenciales por nombre (flexible):
    posibles_num = []
    for name in df.columns:
        low = name.lower()
        if any(k in low for k in ["pib", "empleo", "educ", "salud", "ingres", "renta"]):
            posibles_num.append(name)
    # Se agregan columnas categóricas de origen/destino si existen (se indexan):
    indexers = []
    df_idx = df

    if col_origen:
        idx_o = StringIndexer(inputCol=col_origen, outputCol="Origen_idx", handleInvalid="keep")
        df_idx = idx_o.fit(df_idx).transform(df_idx)
    if col_dest:
        idx_d = StringIndexer(inputCol=col_dest, outputCol="Destino_idx", handleInvalid="keep")
        df_idx = idx_d.fit(df_idx).transform(df_idx)

    # Se arma la lista final de features: numéricos detectados + índices categóricos disponibles
    features = []
    features += posibles_num
    if "Origen_idx" in df_idx.columns:  features.append("Origen_idx")
    if "Destino_idx" in df_idx.columns: features.append("Destino_idx")

    # Validación mínima de features
    if len(features) == 0:
        print("\n[MLlib] No se detectaron columnas predictoras apropiadas por nombre (PIB/Empleo/Educación/Salud...).")
        print("[MLlib] Ajusta la lista 'posibles_num' o renombra columnas en el CSV.")
    else:
        print("\n[MLlib] Columnas usadas como features:", features)

        # Ensamble de vector de características
        assembler = VectorAssembler(inputCols=features, outputCol="features")

        # Conversión de etiqueta a double
        df_ml = df_idx.withColumn("label", col(label_col).cast("double"))
        df_ml = assembler.transform(df_ml).select("features", "label").na.drop()

        # División train/test
        train, test = df_ml.randomSplit([0.8, 0.2], seed=42)

        # Modelo: Regresión Logística binaria
        lr = LogisticRegression(maxIter=20)
        modelo = lr.fit(train)

        # Evaluación con AUC-ROC
        pred = modelo.transform(test)
        evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
        auc = evaluator.evaluate(pred)

        print(f"[MLlib] AUC ROC del modelo: {auc:.4f}")
        pred.select("features", "label", "prediction", "probability").show(5, truncate=False)



Esquema del DataFrame:
root
 |-- ID: integer (nullable = true)
 |-- Origen: string (nullable = true)
 |-- Destino: string (nullable = true)
 |-- Año: integer (nullable = true)
 |-- Razón: string (nullable = true)
 |-- PIB_Origen: integer (nullable = true)
 |-- PIB_Destino: integer (nullable = true)
 |-- Tasa_Desempleo_Origen: double (nullable = true)
 |-- Tasa_Desempleo_Destino: double (nullable = true)
 |-- Nivel_Educativo_Origen: double (nullable = true)
 |-- Nivel_Educativo_Destino: double (nullable = true)
 |-- Población_Origen: integer (nullable = true)
 |-- Población_Destino: integer (nullable = true)

Primeras filas:
+---+---------+---------------+----+---------+----------+-----------+---------------------+----------------------+----------------------+-----------------------+----------------+-----------------+
|ID |Origen   |Destino        |Año |Razón    |PIB_Origen|PIB_Destino|Tasa_Desempleo_Origen|Tasa_Desempleo_Destino|Nivel_Educativo_Origen|Nivel_Educativo_Destino|Población_