# Análisis de migraciones con Spark

Este notebook contiene las instrucciones y el código (celdas **Markdown** y **Código**) para realizar las tareas solicitadas sobre el archivo `/mnt/data/migraciones.csv` usando **PySpark**.

**Notas:**

- Diseñado para ejecutarse en un entorno con `pyspark` disponible (por ejemplo, un cluster Spark, Databricks, o una instalación local con PySpark).  
- Si `pyspark` no está instalado, las celdas incluyen mensajes guía para instalar/activar el entorno.  
- Rutas usadas: dataset en `/mnt/data/migraciones.csv`, salida Parquet en `/mnt/data/migraciones_parquet/`.

Ejecute las celdas en orden.


## 0. Preparar entorno y crear SparkSession

Esta celda crea un `SparkSession`. Si no tiene `pyspark` instalado, aparecerá un error y la celda muestra instrucciones.

In [None]:

# Celda: crear SparkSession
try:
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
        .appName("MigracionesAnalysis") \
        .config("spark.sql.shuffle.partitions", "8") \
        .getOrCreate()
    print("SparkSession iniciada correctamente: ", spark)
except Exception as e:
    print("Error al iniciar SparkSession. Asegúrese de tener pyspark instalado y configurado.")
    print("Error:", e)
    print("\nSi está en un entorno local, puede instalar pyspark con: pip install pyspark")
    print("En entornos gestionados (Databricks, EMR, GCP Dataproc, etc.) no necesita instalar pyspark.")


## 1. Carga y exploración de datos (2 puntos)

- Cargue el CSV en Spark.
- Convierta a RDD y DataFrame.
- Muestre primeras filas, esquema y estadísticas descriptivas.

In [None]:

# Celda: leer CSV como DataFrame de Spark
csv_path = "/mnt/data/migraciones.csv"

# Intentamos leer el CSV con inferSchema para facilitar el análisis.
try:
    df = spark.read.option("header", True).option("inferSchema", True).csv(csv_path)
    print("Dataset cargado en DataFrame 'df' con {} filas".format(df.count()))
except Exception as e:
    print("Error al leer el CSV. Verifique la ruta y el formato. Error:", e)
    df = None

# Convertir a RDD
if df is not None:
    rdd = df.rdd
    print("Se generó rdd a partir de DataFrame.")
else:
    rdd = None

# Mostrar primeras filas
if df is not None:
    display(df.limit(10).toPandas())  # muestra las primeras 10 filas (convierte a pandas para mejor visualización)
    print('\nEsquema del DataFrame:')
    df.printSchema()

    print('\nEstadísticas descriptivas (describe()):')
    display(df.describe().toPandas())
else:
    print('DataFrame no disponible.')


## 2. Procesamiento con RDDs y DataFrames (3 puntos)

### 2.1 Transformaciones y acciones con RDDs

Ejemplos de `filter`, `map`, `flatMap` y acciones `collect`, `take`, `count`. Ajuste las columnas según su dataset.

In [None]:

# Celda: operaciones con RDD (ejemplos)
if rdd is not None:
    # Mostrar esquema de rdd (primer elemento)
    first = rdd.first()
    print("Primer registro (tupla Row):", first)

    # Ejemplo: filtrar filas cuyo país origen no sea nulo (ajuste 'pais_origen' al nombre real)
    # Intentamos detectar columnas comunes
    cols = df.columns
    print("Columnas detectadas:", cols)

    # Buscamos columnas con nombres parecidos a 'origen' y 'destino'
    origen_cols = [c for c in cols if 'origen' in c.lower() or 'or'==c.lower()]
    destino_cols = [c for c in cols if 'destino' in c.lower() or 'dest'==c.lower()]
    print("Posibles columnas de origen:", origen_cols)
    print("Posibles columnas de destino:", destino_cols)

    # Use la primera columna candidata si existe, sino use la primera columna del dataset (solo ejemplo)
    origen_col = origen_cols[0] if origen_cols else cols[0]
    destino_col = destino_cols[0] if destino_cols else (cols[1] if len(cols)>1 else cols[0])

    print("Usando para ejemplos:", origen_col, destino_col)

    # filter: conservar registros donde origen no sea nulo
    filtered = rdd.filter(lambda row: row[origen_col] is not None and str(row[origen_col]).strip()!='')
    print("Registros con origen no nulo:", filtered.count())

    # map: crear pares (origen, 1)
    mapped = filtered.map(lambda row: (row[origen_col], 1))

    # flatMap: si hubiera una columna con múltiples razones separadas por ';', dividirlas
    # buscamos columna con 'razon' en el nombre
    razon_cols = [c for c in cols if 'razon' in c.lower() or 'motivo' in c.lower()]
    print("Posibles columnas de razón de migración:", razon_cols)
    if razon_cols:
        razon_col = razon_cols[0]
        flat = rdd.flatMap(lambda row: str(row[razon_col]).split(';') if row[razon_col] else [])
        print("Ejemplo flatMap (primeros 10):", flat.take(10))
    else:
        print("No se encontró columna de razón para flatMap de ejemplo.")

    # acciones: collect (¡cuidado con tamaños grandes!), take, count
    print("Ejemplo take(5):", mapped.take(5))
else:
    print("RDD no disponible.")


### 2.2 Operaciones con DataFrames: filtrado, agregaciones y ordenamiento

Además, se guardan resultados en formato Parquet.

In [None]:

# Celda: operaciones con DataFrame
import os
output_parquet = "/mnt/data/migraciones_parquet"

if df is not None:
    # Filtrar: ejemplo conservando migraciones después de cierto año si existe 'anio' o 'year'
    year_cols = [c for c in df.columns if 'anio' in c.lower() or 'year' in c.lower() or 'año' in c.lower()]
    print("Posibles columnas de año:", year_cols)
    if year_cols:
        year_col = year_cols[0]
        try:
            df_filtered = df.filter(df[year_col] >= 2000)  # ejemplo
            print("Filtrado por año >= 2000. Registros:", df_filtered.count())
        except Exception as e:
            print("No se pudo filtrar por año (tipo/valores). Error:", e)
            df_filtered = df
    else:
        df_filtered = df

    # Agregaciones: top países origen y destino
    # Usamos las columnas detectadas antes (origen_col, destino_col)
    top_origen = df.groupBy(origen_col).count().orderBy('count', ascending=False)
    top_destino = df.groupBy(destino_col).count().orderBy('count', ascending=False)

    print("Top 10 países de origen:")
    display(top_origen.limit(10).toPandas())

    print("Top 10 países de destino:")
    display(top_destino.limit(10).toPandas())

    # Guardar en Parquet (sobrescribe si existe)
    try:
        if os.path.exists(output_parquet):
            print("Directorio Parquet ya existe en {}. Se recomienda borrarlo manualmente si desea sobrescribir.".format(output_parquet))
        else:
            df.write.mode('overwrite').parquet(output_parquet)
            print("DataFrame escrito en Parquet en:", output_parquet)
    except Exception as e:
        print("Error al escribir Parquet:", e)
else:
    print("DataFrame no disponible para operaciones.")


## 3. Consultas con Spark SQL (2 puntos)

- Registrar el DataFrame como una tabla temporal.
- Consultas sobre principales países de origen/destino.
- Análisis de razones de migración por región.

In [None]:

# Celda: consultas SQL
if df is not None:
    df.createOrReplaceTempView("migraciones")
    print("Tabla temporal 'migraciones' registrada. Ahora puede ejecutar consultas SQL.")

    # Top países origen
    q1 = """SELECT {o} AS origen, COUNT(*) AS cnt
           FROM migraciones
           GROUP BY {o}
           ORDER BY cnt DESC
           LIMIT 10""".format(o=origen_col)
    print("\nTop países de origen (SQL):")
    display(spark.sql(q1).toPandas())

    # Top países destino
    q2 = """SELECT {d} AS destino, COUNT(*) AS cnt
           FROM migraciones
           GROUP BY {d}
           ORDER BY cnt DESC
           LIMIT 10""".format(d=destino_col)
    print("\nTop países de destino (SQL):")
    display(spark.sql(q2).toPandas())

    # Razones por región: buscamos columnas 'region' y 'razon' si existen
    region_cols = [c for c in df.columns if 'region' in c.lower()]
    if region_cols and razon_cols:
        region_col = region_cols[0]
        razon_col = razon_cols[0]
        q3 = """SELECT {reg} AS region, {raz} AS razon, COUNT(*) AS cnt
               FROM migraciones
               GROUP BY {reg}, {raz}
               ORDER BY {reg}, cnt DESC
               LIMIT 50""".format(reg=region_col, raz=razon_col)
        print("\nPrincipales razones por región:")
        display(spark.sql(q3).toPandas())
    else:
        print("No se encontraron columnas 'region' y/o 'razon' para el análisis. Columnas disponibles:", df.columns)
else:
    print("DataFrame no disponible para SQL.")


## 4. Aplicación de MLlib para predicción de flujos migratorios (3 puntos)

- Convertir datos a formato MLlib.
- Aplicar Regression (Logistic Regression) para predecir probabilidad de migración.
- Evaluar el modelo.

**Importante:** el notebook trata de detectar automáticamente una columna etiqueta (por ejemplo `migracion`, `migrante`, `target`). Si no la detecta, edite la variable `label_col` en la siguiente celda.

In [None]:

# Celda: preparación para MLlib y entrenamiento (ejemplo)
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Detectar automáticamente una columna label (heurística)
label_candidates = [c for c in df.columns if any(k in c.lower() for k in ['mig', 'migrant', 'migracion', 'migrate', 'target', 'label'])]
if label_candidates:
    label_col = label_candidates[0]
    print("Label detectada automáticamente:", label_col)
else:
    # Si no encontramos una label, pedir al usuario que edite esta variable manualmente.
    label_col = None
    print("No se detectó columna etiqueta automáticamente. Por favor, asigne manualmente `label_col` al nombre de la columna objetivo. Columnas disponibles:", df.columns)

# Supongamos que label_col es binaria (0/1). Si no es así habrá que transformarla.
if label_col:
    # Seleccionar features numéricas automáticamente
    numeric_cols = [f.name for f in df.schema.fields if str(f.dataType) in ['IntegerType', 'DoubleType', 'LongType', 'FloatType'] and f.name != label_col]
    print("Características numéricas detectadas:", numeric_cols)

    if not numeric_cols:
        print("No se detectaron columnas numéricas. Debe preprocesar variables categóricas antes de usar modelos que requieran vectores numéricos.")

    assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")

    # Transformar label si es string
    if dict(df.dtypes)[label_col] not in ('int', 'double', 'float', 'bigint'):
        indexer = StringIndexer(inputCol=label_col, outputCol="label_indexed", handleInvalid='keep')
        label_used = "label_indexed"
        pipeline_stages = [indexer, assembler]
    else:
        label_used = label_col
        pipeline_stages = [assembler]

    # Add logistic regression to pipeline
    lr = LogisticRegression(featuresCol="features", labelCol=label_used, predictionCol="prediction", maxIter=20)
    pipeline_stages.append(lr)
    pipeline = Pipeline(stages=pipeline_stages)

    # Split data
    train, test = df.randomSplit([0.7, 0.3], seed=42)
    print("Train/Test counts:", train.count(), test.count())

    # Fit model
    model = pipeline.fit(train)
    print("Modelo entrenado.")

    # Predicciones
    preds = model.transform(test)
    print("Predicciones realizadas. Ejemplo:")
    display(preds.select(label_used, "prediction", "probability").limit(10).toPandas())

    # Evaluación
    # Si problema binario usamos BinaryClassificationEvaluator sobre areaUnderROC
    try:
        evaluator = BinaryClassificationEvaluator(rawPredictionCol="probability", labelCol=label_used, metricName="areaUnderROC")
        auc = evaluator.evaluate(preds)
        print("AUC (areaUnderROC):", auc)
    except Exception as e:
        print("No se pudo calcular AUC (posiblemente label no binaria). Error:", e)

    # Matriz de confusión (conteos por etiqueta/predicción)
    cm = preds.groupBy(label_used, "prediction").count().orderBy(label_used, "prediction")
    print("Matriz de confusión (conteos):")
    display(cm.toPandas())

else:
    print("No hay columna etiqueta definida. Edite `label_col` en esta celda para continuar con el pipeline de ML.")


## Conclusiones y pasos siguientes

- El notebook cubre la carga, exploración, transformaciones con RDDs, operaciones con DataFrames, consultas SQL y un pipeline de ejemplo con MLlib.
- **Sugerencias para ejecución real:**
  - Asegúrese de ejecutar en un entorno con `pyspark` y memoria adecuada.  
  - Revise/ajuste los nombres de columnas (`origen`, `destino`, `razon`, `region`, `label`) según su archivo real.  
  - Para variables categóricas, incluya `StringIndexer` + `OneHotEncoder` según convenga.  
  - Para grandes datasets, evite `collect()` y prefiera `limit()` o `take()`.

Si quiere, puedo:  
- Adaptarlo para ejecutar con `pandas` si prefiere (más liviano para computadoras locales sin Spark).