In [187]:
import $ivy.`org.apache.spark::spark-sql:3.5.6`
import $ivy.`org.apache.spark::spark-core:3.5.6`
import $ivy.`org.postgresql:postgresql:42.7.3`  // Driver JDBC de PostgreSQL
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{DataFrame, SaveMode}

// Importar todas las funciones necesarias
import org.apache.spark.sql.functions._

val spark = SparkSession.builder()
  .appName("PostgreSQL-Notebook")
  .master("local[*]")
  .config("spark.sql.adaptive.enabled", "true")
  .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
  .getOrCreate()

import spark.implicits._

// Configurar nivel de logging
spark.sparkContext.setLogLevel("ERROR")  // Solo muestra errores
// O alternativamente:
// spark.sparkContext.setLogLevel("WARN")   // Solo warnings y errores
// spark.sparkContext.setLogLevel("OFF")    // Sin logs

// Configuración de conexión a PostgreSQL
val jdbcUrl = "jdbc:postgresql://localhost:5432/postgres"
val connectionProperties = new java.util.Properties()
connectionProperties.setProperty("user", "postgres")
connectionProperties.setProperty("password", "postgres")
connectionProperties.setProperty("driver", "org.postgresql.Driver")

println("✅ Spark configurado y listo para conectar con PostgreSQL")

✅ Spark configurado y listo para conectar con PostgreSQL



[32mimport [39m[36m$ivy.$[39m
[32mimport [39m[36m$ivy.$[39m
[32mimport [39m[36m$ivy.$[39m
[32mimport [39m[36morg.apache.spark.sql.SparkSession[39m
[32mimport [39m[36morg.apache.spark.sql.{DataFrame, SaveMode}[39m
[32mimport [39m[36morg.apache.spark.sql.functions._[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@16ab0c5f
[32mimport [39m[36mspark.implicits._[39m
[36mjdbcUrl[39m: [32mString[39m = [32m"jdbc:postgresql://localhost:5432/postgres"[39m
[36mconnectionProperties[39m: [32mjava[39m.[32mutil[39m.[32mProperties[39m = {user=postgres, password=postgres, driver=org.postgresql.Driver}
[36mres187_11[39m: [32mObject[39m = [32mnull[39m
[36mres187_12[39m: [32mObject[39m = [32mnull[39m
[36mres187_13[39m: [32mObject[39m = [32mnull[39m

In [188]:
// Función para leer tabla desde PostgreSQL
def leerTablaPostgres(tableName: String): DataFrame = {
  spark.read
    .jdbc(jdbcUrl, tableName, connectionProperties)
}

// Función para escribir DataFrame a PostgreSQL
def escribirTablaPostgres(df: DataFrame, tableName: String, mode: SaveMode = SaveMode.Overwrite): Unit = {
  df.write
    .mode(mode)
    .jdbc(jdbcUrl, tableName, connectionProperties)
}

// Función para ejecutar query SQL directamente
def ejecutarQuery(query: String): DataFrame = {
  spark.read
    .format("jdbc")
    .option("url", jdbcUrl)
    .option("query", query)
    .option("user", "postgres")
    .option("password", "postgres")
    .option("driver", "org.postgresql.Driver")
    .load()
}

println("🔧 Funciones de PostgreSQL definidas")

🔧 Funciones de PostgreSQL definidas



defined [32mfunction[39m [36mleerTablaPostgres[39m
defined [32mfunction[39m [36mescribirTablaPostgres[39m
defined [32mfunction[39m [36mejecutarQuery[39m

In [189]:
// Probar la conexión
try {
  /*
  // Listar todas las tablas
  val tablas = ejecutarQuery("""
    SELECT table_name 
    FROM information_schema.tables 
    WHERE table_schema = 'public'
  """)
  
  println("📋 Tablas disponibles en PostgreSQL:")
  tablas.show()
  */
  
  val miTabla = leerTablaPostgres("postulantes")
  println(s"📊 Registros en la tabla: ${miTabla.count()}")
  miTabla.show(5)
  
} catch {
  case e: Exception => 
    println(s"❌ Error conectando a PostgreSQL: ${e.getMessage}")
    println("🔍 Verifica que PostgreSQL esté ejecutándose y las credenciales sean correctas")
}

📊 Registros en la tabla: 4928

+-----------+--------------------+--------------------+--------------------+-----------------+------+------------+-------------+---------+-------------+-------------------+----------------+---------------+---------+---------------+-------------+-----------+----+---------+----+-------+---+-------+-----+------------------+--------------+---------+
|fecha_corte|                uuid|             escuela|           modalidad|annio_postulacion|ubigeo|departamento|    provincia| distrito|ubigeo_origen|departamento_origen|provincia_origen|distrito_origen|ubigeo_ie|departamento_ie| provincia_ie|distrito_ie|edad|     sexo|pago| idioma|lee|escribe|habla|tiene_discapacidad|tipo_comunidad|comunidad|
+-----------+--------------------+--------------------+--------------------+-----------------+------+------------+-------------+---------+-------------+-------------------+----------------+---------------+---------+---------------+-------------+-----------+----+---------+-

In [190]:
// PASO 1: Cargar datos y crear vista temporal
try {
  val miTabla = leerTablaPostgres("postulantes")
  println(s"📊 Registros en la tabla: ${miTabla.count()}")
  
  // Crear vista temporal para usar con spark.sql()
  miTabla.createOrReplaceTempView("postulantes")
  println("✅ Vista temporal 'postulantes' creada")
  
  // Importar funciones necesarias
  import org.apache.spark.sql.functions._
  import org.apache.spark.sql.expressions.Window
  
  println("🔧 Funciones importadas y listas para MapReduce")
  
} catch {
  case e: Exception => 
    println(s"❌ Error: ${e.getMessage}")
}

📊 Registros en la tabla: 4928
✅ Vista temporal 'postulantes' creada
🔧 Funciones importadas y listas para MapReduce

✅ Vista temporal 'postulantes' creada
🔧 Funciones importadas y listas para MapReduce


# 📊 ANÁLISIS MAPREDUCE CON SPARK-SCALA

Este notebook implementa análisis de datos usando **MapReduce** con Spark-Scala sobre el dataset de postulantes universitarios.

## 🎯 Objetivos del análisis:
1. **5 consultas** con tres campos o más usando MapReduce
2. **Agrupaciones** con máximos y mínimos por categorías
3. **Estadísticas completas** (promedio, mediana, desviación estándar)
4. **5 consultas con decimales** que impliquen múltiples operaciones MapReduce

---

## 🔍 SECCIÓN 1: CINCO CONSULTAS CON TRES CAMPOS O MÁS

Esta sección implementa consultas que utilizan **map()**, **reduceByKey()** y **groupByKey()** para analizar múltiples dimensiones del dataset simultáneamente.

In [191]:
// CONSULTA 1: Cantidad de postulantes por carrera, modalidad y sexo
println("🔍 CONSULTA 1: Postulantes por carrera, modalidad y sexo")

// Usar la vista temporal y convertir a RDD para MapReduce
val consulta1 = spark.sql("SELECT * FROM postulantes").rdd
  .map(row => {
    val escuela = row.getAs[String]("escuela")
    val modalidad = row.getAs[String]("modalidad")
    val sexo = row.getAs[String]("sexo")
    // Crear clave compuesta de 3 campos
    ((escuela, modalidad, sexo), 1)
  })
  .reduceByKey(_ + _)  // Sumar conteos por clave
  .map { case ((escuela, modalidad, sexo), count) => 
    (escuela, modalidad, sexo, count) 
  }
  .toDF("escuela", "modalidad", "sexo", "cantidad")
  .orderBy($"cantidad".desc)

consulta1.show(10)
println(s"📈 Total de combinaciones encontradas: ${consulta1.count()}")

🔍 CONSULTA 1: Postulantes por carrera, modalidad y sexo

+--------------------+--------------------+---------+--------+
|             escuela|           modalidad|     sexo|cantidad|
+--------------------+--------------------+---------+--------+
|    INGENIERÍA CIVIL|           ORDINARIO|MASCULINO|     812|
|INGENIERÍA AGRONÓ...|           ORDINARIO|MASCULINO|     517|
|    INGENIERÍA CIVIL|PRIMERA OPORTUNID...|MASCULINO|     291|
|INGENIERÍA AGRONÓ...|           ORDINARIO| FEMENINO|     282|
|    INGENIERÍA CIVIL|           ORDINARIO| FEMENINO|     237|
|INGENIERÍA AGRONÓ...|PRIMERA OPORTUNID...|MASCULINO|     221|
|INGENIERÍA DE ALI...|           ORDINARIO| FEMENINO|     210|
|INGENIERÍA DE ALI...|           ORDINARIO|MASCULINO|     183|
|INGENIERÍA AGRONÓ...|EXTRAORDINARIO-CO...|MASCULINO|     165|
|INGENIERÍA AGRONÓ...|PRIMERA OPORTUNID...| FEMENINO|     161|
+--------------------+--------------------+---------+--------+
only showing top 10 rows

+--------------------+-------------

[36mconsulta1[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [escuela: string, modalidad: string ... 2 more fields]

In [192]:
// CONSULTA 2: Suma de pagos por departamento, provincia y año de postulación
println("\n🔍 CONSULTA 2: Suma de pagos por departamento, provincia y año")

val consulta2 = spark.sql("SELECT * FROM postulantes").rdd
  .map(row => {
    val dept = row.getAs[String]("departamento")
    val prov = row.getAs[String]("provincia")
    val anio = row.getAs[Integer]("annio_postulacion")
    val pago = row.getAs[Integer]("pago")
    // Crear clave compuesta y valor numérico
    ((dept, prov, anio), if (pago != null) pago.toDouble else 0.0)
  })
  .reduceByKey(_ + _)  // Sumar pagos por ubicación y año
  .map { case ((dept, prov, anio), suma) => 
    (dept, prov, anio, suma) 
  }
  .toDF("departamento", "provincia", "anio_postulacion", "suma_pagos")
  .orderBy($"suma_pagos".desc)

consulta2.show(10)
println(f"💰 Total recaudado: S/. ${consulta2.agg(sum($"suma_pagos")).collect()(0)(0)}")


🔍 CONSULTA 2: Suma de pagos por departamento, provincia y año

+------------+-------------+----------------+----------+
|departamento|    provincia|anio_postulacion|suma_pagos|
+------------+-------------+----------------+----------+
|       CUSCO|LA CONVENCIÓN|            2023|  185670.0|
|       CUSCO|LA CONVENCIÓN|            2020|  140500.0|
|       CUSCO|LA CONVENCIÓN|            2019|  139950.0|
|       CUSCO|LA CONVENCIÓN|            2022|  108700.0|
|       CUSCO|LA CONVENCIÓN|            2021|  105440.0|
|       CUSCO|        CUSCO|            2023|   20710.0|
|       CUSCO|        CUSCO|            2021|   12550.0|
|       CUSCO|        CUSCO|            2019|   11000.0|
|       CUSCO|        CUSCO|            2020|   10400.0|
|       CUSCO|        CUSCO|            2022|    8850.0|
+------------+-------------+----------------+----------+
only showing top 10 rows

+------------+-------------+----------------+----------+
|departamento|    provincia|anio_postulacion|suma_pagos

[36mconsulta2[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [departamento: string, provincia: string ... 2 more fields]

In [193]:
// CONSULTA 3: Postulantes por distrito origen, distrito IE y tipo de comunidad
println("\n🔍 CONSULTA 3: Postulantes por distrito origen, distrito IE y tipo de comunidad")

val consulta3 = spark.sql("SELECT * FROM postulantes").rdd
  .map(row => {
    val distOrigen = row.getAs[String]("distrito_origen")
    val distIE = row.getAs[String]("distrito_ie")
    val tipoComunidad = row.getAs[String]("tipo_comunidad")
    // Analizar flujo geográfico y tipo de comunidad
    ((distOrigen, distIE, tipoComunidad), 1)
  })
  .reduceByKey(_ + _)
  .map { case ((distOrigen, distIE, tipoComunidad), count) => 
    (distOrigen, distIE, tipoComunidad, count) 
  }
  .toDF("distrito_origen", "distrito_ie", "tipo_comunidad", "cantidad")
  .orderBy($"cantidad".desc)

consulta3.show(10)
println("🌍 Análisis de movilidad geográfica por tipo de comunidad completado")


🔍 CONSULTA 3: Postulantes por distrito origen, distrito IE y tipo de comunidad

+---------------+-----------+--------------+--------+
|distrito_origen|distrito_ie|tipo_comunidad|cantidad|
+---------------+-----------+--------------+--------+
|      SANTA ANA|  SANTA ANA|       NINGUNA|    1354|
|      SANTA ANA|  SANTA ANA|          NULL|     367|
|     VILCABAMBA| VILCABAMBA|       NINGUNA|     349|
|       ECHARATE|   ECHARATE|       NINGUNA|     316|
|     VILCABAMBA| VILCABAMBA|        ANDINO|     133|
|      MEGANTONI|  MEGANTONI|     AMAZÓNICO|     124|
|      QUELLOUNO|  QUELLOUNO|       NINGUNA|     112|
|      MEGANTONI|  MEGANTONI|       NINGUNA|     101|
|        KIMBIRI|    KIMBIRI|       NINGUNA|      93|
|        PICHARI|    PICHARI|       NINGUNA|      90|
+---------------+-----------+--------------+--------+
only showing top 10 rows

🌍 Análisis de movilidad geográfica por tipo de comunidad completado
+---------------+-----------+--------------+--------+
|distrito_orige

[36mconsulta3[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [distrito_origen: string, distrito_ie: string ... 2 more fields]

In [194]:
// CONSULTA 4: Análisis de habilidades (leer, escribir, hablar) por sexo y departamento
println("\n🔍 CONSULTA 4: Habilidades lingüísticas por sexo y departamento")

val consulta4 = spark.sql("SELECT * FROM postulantes").rdd
  .map(row => {
    val sexo = row.getAs[String]("sexo")
    val dept = row.getAs[String]("departamento")
    // Convertir habilidades a valores numéricos
    val lee = if (row.getAs[String]("lee") == "SI") 1 else 0
    val escribe = if (row.getAs[String]("escribe") == "SI") 1 else 0
    val habla = if (row.getAs[String]("habla") == "SI") 1 else 0
    // Clave compuesta con tupla de conteos
    ((sexo, dept), (lee, escribe, habla, 1))
  })
  .reduceByKey { case ((l1, e1, h1, c1), (l2, e2, h2, c2)) => 
    (l1 + l2, e1 + e2, h1 + h2, c1 + c2)  // Sumar todas las habilidades
  }
  .map { case ((sexo, dept), (lee, escribe, habla, total)) => 
    (sexo, dept, lee, escribe, habla, total)
  }
  .toDF("sexo", "departamento", "saben_leer", "saben_escribir", "saben_hablar", "total_postulantes")
  .orderBy($"total_postulantes".desc)

consulta4.show(10)
println("📚 Análisis de competencias lingüísticas por género y región completado")


🔍 CONSULTA 4: Habilidades lingüísticas por sexo y departamento

+---------+-------------+----------+--------------+------------+-----------------+
|     sexo| departamento|saben_leer|saben_escribir|saben_hablar|total_postulantes|
+---------+-------------+----------+--------------+------------+-----------------+
|MASCULINO|        CUSCO|      2082|          1588|        1641|             2721|
| FEMENINO|        CUSCO|      1397|           991|        1022|             1762|
|MASCULINO|     AYACUCHO|        81|            60|          67|               99|
| FEMENINO|     AYACUCHO|        35|            24|          27|               53|
|MASCULINO|         LIMA|        37|            30|          29|               43|
| FEMENINO|         LIMA|        28|            24|          24|               38|
|MASCULINO|MADRE DE DIOS|        25|            20|          20|               32|
|MASCULINO|         PUNO|        21|            15|          15|               26|
|MASCULINO|     APURÍM

[36mconsulta4[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [sexo: string, departamento: string ... 4 more fields]

In [195]:
// CONSULTA 5: Postulantes con discapacidad por modalidad, idioma y rango de edad
println("\n🔍 CONSULTA 5: Postulantes con discapacidad por modalidad, idioma y edad")

val consulta5 = spark.sql("SELECT * FROM postulantes WHERE tiene_discapacidad = 'SI'").rdd
  .map(row => {
    val modalidad = row.getAs[String]("modalidad")
    val idioma = row.getAs[String]("idioma")
    val edad = row.getAs[Integer]("edad")
    // Crear rangos de edad para análisis
    val rangoEdad = if (edad != null) {
      if (edad < 18) "Menor de 18"
      else if (edad <= 25) "18-25"
      else "Mayor de 25"
    } else "Sin datos"
    ((modalidad, idioma, rangoEdad), 1)
  })
  .reduceByKey(_ + _)
  .map { case ((modalidad, idioma, rangoEdad), count) => 
    (modalidad, idioma, rangoEdad, count) 
  }
  .toDF("modalidad", "idioma", "rango_edad", "cantidad_con_discapacidad")
  .orderBy($"cantidad_con_discapacidad".desc)

consulta5.show(10)
println("♿ Análisis de inclusión por modalidad, idioma y edad completado")


🔍 CONSULTA 5: Postulantes con discapacidad por modalidad, idioma y edad

+--------------------+----------+-----------+-------------------------+
|           modalidad|    idioma| rango_edad|cantidad_con_discapacidad|
+--------------------+----------+-----------+-------------------------+
|           ORDINARIO|   Español|      18-25|                       12|
|EXTRAORDINARIO-PE...|   Español|      18-25|                        6|
|PRIMERA OPORTUNID...|   Español|      18-25|                        5|
|EXTRAORDINARIO-CO...|   Quechua|      18-25|                        3|
|EXTRAORDINARIO-PE...|   Quechua|Mayor de 25|                        1|
|PRIMERA OPORTUNID...|   Español|Menor de 18|                        1|
|EXTRAORDINARIO-CO...|Matsigenka|      18-25|                        1|
|           ORDINARIO|   Español|Mayor de 25|                        1|
|EXTRAORDINARIO-PE...|   Quechua|      18-25|                        1|
+--------------------+----------+-----------+-----------------

[36mconsulta5[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [modalidad: string, idioma: string ... 2 more fields]

## 📊 SECCIÓN 2: AGRUPACIONES CON MÁXIMOS Y MÍNIMOS

Esta sección utiliza **groupByKey()** para calcular estadísticas agregadas (máximo, mínimo, promedio) por categorías usando operaciones MapReduce.

In [196]:
// AGRUPACIÓN 1: Pagos máximos, mínimos y promedios por escuela
println("📊 AGRUPACIÓN 1: Estadísticas de pagos por carrera profesional")

val agrupacion1 = spark.sql("SELECT * FROM postulantes").rdd
  .map(row => {
    val escuela = row.getAs[String]("escuela")
    val pago = row.getAs[Integer]("pago")
    val pagoValue = if (pago != null) pago.toDouble else 0.0
    (escuela, pagoValue)
  })
  .groupByKey()  // Agrupar todos los pagos por escuela
  .map { case (escuela, pagos) => 
    val pagosList = pagos.toList.filter(_ > 0)  // Filtrar pagos válidos
    if (pagosList.nonEmpty) {
      val maximo = pagosList.max
      val minimo = pagosList.min
      val promedio = pagosList.sum / pagosList.length
      val mediana = {
        val sorted = pagosList.sorted
        val n = sorted.length
        if (n % 2 == 0) (sorted(n/2-1) + sorted(n/2)) / 2.0
        else sorted(n/2)
      }
      (escuela, maximo, minimo, promedio, mediana, pagosList.length)
    } else {
      (escuela, 0.0, 0.0, 0.0, 0.0, 0)
    }
  }
  .toDF("escuela", "pago_maximo", "pago_minimo", "pago_promedio", "pago_mediana", "total_postulantes")
  .orderBy($"pago_maximo".desc)

agrupacion1.show(15, truncate = false)
println("💡 Análisis: Diferencias significativas en pagos entre carreras")

📊 AGRUPACIÓN 1: Estadísticas de pagos por carrera profesional

+------------------------------+-----------+-----------+------------------+------------+-----------------+
|escuela                       |pago_maximo|pago_minimo|pago_promedio     |pago_mediana|total_postulantes|
+------------------------------+-----------+-----------+------------------+------------+-----------------+
|INGENIERÍA CIVIL              |670.0      |50.0       |189.8494983277592 |200.0       |1794             |
|INGENIERÍA DE ALIMENTOS       |670.0      |50.0       |171.30968622100954|200.0       |733              |
|INGENIERÍA AGRONÓMICA TROPICAL|670.0      |50.0       |174.1032428855063 |200.0       |1511             |
|ECOTURISMO                    |300.0      |50.0       |167.36111111111111|200.0       |504              |
|CONTABILIDAD                  |250.0      |200.0      |205.8139534883721 |200.0       |129              |
|ECONOMÍA                      |250.0      |200.0      |206.03864734299518|200.0 

[36magrupacion1[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [escuela: string, pago_maximo: double ... 4 more fields]

In [197]:
// AGRUPACIÓN 2: Edades máximas, mínimas y promedios por departamento
println("\n📊 AGRUPACIÓN 2: Estadísticas de edad por departamento")

val agrupacion2 = spark.sql("SELECT * FROM postulantes WHERE edad IS NOT NULL").rdd
  .map(row => {
    val departamento = row.getAs[String]("departamento")
    val edad = row.getAs[Integer]("edad").toDouble
    (departamento, edad)
  })
  .groupByKey()
  .map { case (departamento, edades) => 
    val edadesList = edades.toList
    val maximo = edadesList.max
    val minimo = edadesList.min
    val promedio = edadesList.sum / edadesList.length
    val mediana = {
      val sorted = edadesList.sorted
      val n = sorted.length
      if (n % 2 == 0) (sorted(n/2-1) + sorted(n/2)) / 2.0
      else sorted(n/2)
    }
    // Calcular desviación estándar
    val varianza = edadesList.map(edad => math.pow(edad - promedio, 2)).sum / edadesList.length
    val desviacion = math.sqrt(varianza)
    
    (departamento, maximo, minimo, promedio, mediana, desviacion, edadesList.length)
  }
  .toDF("departamento", "edad_maxima", "edad_minima", "edad_promedio", "edad_mediana", "desviacion_std", "total_postulantes")
  .orderBy($"edad_promedio".desc)

agrupacion2.show(15)
println("📈 Análisis: Variaciones demográficas por región")


📊 AGRUPACIÓN 2: Estadísticas de edad por departamento

+-------------+-----------+-----------+------------------+------------+------------------+-----------------+
| departamento|edad_maxima|edad_minima|     edad_promedio|edad_mediana|    desviacion_std|total_postulantes|
+-------------+-----------+-----------+------------------+------------+------------------+-----------------+
|MADRE DE DIOS|       38.0|       16.0|23.305555555555557|        19.0| 7.919102938971351|               36|
|       LORETO|       30.0|       17.0|             22.75|        22.0| 4.656984002549289|                4|
|     APURÍMAC|       53.0|       17.0|22.516129032258064|        20.0| 7.602507938245946|               31|
|  LA LIBERTAD|       25.0|       20.0|              22.5|        22.5|               2.5|                2|
|   LAMBAYEQUE|       26.0|       18.0|              22.0|        22.0|               4.0|                2|
|     AYACUCHO|       46.0|       17.0|21.513157894736842|        20.0| 

[36magrupacion2[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [departamento: string, edad_maxima: double ... 5 more fields]

## 📈 SECCIÓN 3: ESTADÍSTICAS DETALLADAS DE CAMPOS NUMÉRICOS

Análisis estadístico completo utilizando **collect()** y operaciones de agregación manual para calcular medidas de tendencia central y dispersión.

In [198]:
// ESTADÍSTICAS COMPLETAS DEL CAMPO EDAD
println("📈 ESTADÍSTICAS COMPLETAS: Análisis del campo EDAD")

// Importar funciones matemáticas
import scala.math.{sqrt, pow}

// Recopilar todos los datos de edad para análisis estadístico
val estadisticasEdad = spark.sql("SELECT edad FROM postulantes WHERE edad IS NOT NULL")
  .rdd
  .map(row => row.getAs[Integer]("edad").toDouble)
  .collect()

// Calcular estadísticas descriptivas
val n = estadisticasEdad.length
val suma = estadisticasEdad.sum
val promedio = suma / n
val maximo = estadisticasEdad.max
val minimo = estadisticasEdad.min

// Calcular mediana
val edadesOrdenadas = estadisticasEdad.sorted
val mediana = if (n % 2 == 0) {
  (edadesOrdenadas(n/2 - 1) + edadesOrdenadas(n/2)) / 2
} else {
  edadesOrdenadas(n/2)
}

// Calcular medidas de dispersión
val varianza = estadisticasEdad.map(edad => pow(edad - promedio, 2)).sum / n
val desviacionEstandar = sqrt(varianza)
val coeficienteVariacion = (desviacionEstandar / promedio) * 100

// Calcular cuartiles
val q1 = edadesOrdenadas((n * 0.25).toInt)
val q3 = edadesOrdenadas((n * 0.75).toInt)
val iqr = q3 - q1

println(f"📊 ESTADÍSTICAS DEL CAMPO EDAD:")
println(f"   📏 Tamaño de muestra: $n registros")
println(f"   📐 Tendencia central:")
println(f"      • Promedio: $promedio%.2f años")
println(f"      • Mediana: $mediana%.2f años")
println(f"   📊 Rango:")
println(f"      • Máximo: $maximo%.0f años")
println(f"      • Mínimo: $minimo%.0f años")
println(f"      • Rango total: ${maximo - minimo}%.0f años")
println(f"   📈 Dispersión:")
println(f"      • Desviación estándar: $desviacionEstandar%.2f años")
println(f"      • Varianza: $varianza%.2f")
println(f"      • Coeficiente de variación: $coeficienteVariacion%.2f%%")
println(f"   📦 Cuartiles:")
println(f"      • Q1 (25%%): $q1%.0f años")
println(f"      • Q3 (75%%): $q3%.0f años")
println(f"      • Rango intercuartílico: $iqr%.0f años")

📈 ESTADÍSTICAS COMPLETAS: Análisis del campo EDAD

📊 ESTADÍSTICAS DEL CAMPO EDAD:
   📏 Tamaño de muestra: 4928 registros
   📐 Tendencia central:
      • Promedio: 20.24 años
      • Mediana: 19.00 años
   📊 Rango:
      • Máximo: 67 años
      • Mínimo: 15 años
      • Rango total: 52 años
   📈 Dispersión:
      • Desviación estándar: 4.16 años
      • Varianza: 17.32
      • Coeficiente de variación: 20.57%
   📦 Cuartiles:
      • Q1 (25%): 18 años
      • Q3 (75%): 21 años
      • Rango intercuartílico: 3 años
📊 ESTADÍSTICAS DEL CAMPO EDAD:
   📏 Tamaño de muestra: 4928 registros
   📐 Tendencia central:
      • Promedio: 20.24 años
      • Mediana: 19.00 años
   📊 Rango:
      • Máximo: 67 años
      • Mínimo: 15 años
      • Rango total: 52 años
   📈 Dispersión:
      • Desviación estándar: 4.16 años
      • Varianza: 17.32
      • Coeficiente de variación: 20.57%
   📦 Cuartiles:
      • Q1 (25%): 18 años
      • Q3 (75%): 21 años
      • Rango intercuartílico: 3 años


[32mimport [39m[36mscala.math.{sqrt, pow}[39m
[36mestadisticasEdad[39m: [32mArray[39m[[32mDouble[39m] = [33mArray[39m(
  [32m20.0[39m,
  [32m20.0[39m,
  [32m19.0[39m,
  [32m20.0[39m,
  [32m20.0[39m,
  [32m21.0[39m,
  [32m20.0[39m,
  [32m20.0[39m,
  [32m20.0[39m,
  [32m20.0[39m,
  [32m19.0[39m,
  [32m20.0[39m,
  [32m19.0[39m,
  [32m21.0[39m,
  [32m21.0[39m,
  [32m19.0[39m,
  [32m21.0[39m,
  [32m20.0[39m,
  [32m20.0[39m,
  [32m21.0[39m,
  [32m19.0[39m,
  [32m19.0[39m,
  [32m19.0[39m,
  [32m21.0[39m,
  [32m20.0[39m,
  [32m20.0[39m,
  [32m20.0[39m,
  [32m20.0[39m,
  [32m20.0[39m,
  [32m21.0[39m,
  [32m21.0[39m,
  [32m22.0[39m,
  [32m25.0[39m,
  [32m20.0[39m,
  [32m23.0[39m,
  [32m22.0[39m,
  [32m21.0[39m,
  [32m32.0[39m,
...
[36mn[39m: [32mInt[39m = [32m4928[39m
[36msuma[39m: [32mDouble[39m = [32m99727.0[39m
[36mpromedio[39m: [32mDouble[39m = [32m20.236810064935064[39m
[36mmaximo[39m

In [199]:
// ESTADÍSTICAS COMPLETAS DEL CAMPO PAGO
println("\n💰 ESTADÍSTICAS COMPLETAS: Análisis del campo PAGO")

val estadisticasPago = spark.sql("SELECT pago FROM postulantes WHERE pago IS NOT NULL AND pago > 0")
  .rdd
  .map(row => row.getAs[Integer]("pago").toDouble)
  .collect()

val nPago = estadisticasPago.length
val sumaPago = estadisticasPago.sum
val promedioPago = sumaPago / nPago
val maximoPago = estadisticasPago.max
val minimoPago = estadisticasPago.min

// Calcular mediana para pagos
val pagosOrdenados = estadisticasPago.sorted
val medianaPago = if (nPago % 2 == 0) {
  (pagosOrdenados(nPago/2 - 1) + pagosOrdenados(nPago/2)) / 2
} else {
  pagosOrdenados(nPago/2)
}

// Estadísticas de dispersión para pagos
val varianzaPago = estadisticasPago.map(pago => pow(pago - promedioPago, 2)).sum / nPago
val desviacionEstandarPago = sqrt(varianzaPago)
val coeficienteVariacionPago = (desviacionEstandarPago / promedioPago) * 100

// Calcular cuartiles para pagos
val q1Pago = pagosOrdenados((nPago * 0.25).toInt)
val q3Pago = pagosOrdenados((nPago * 0.75).toInt)
val iqrPago = q3Pago - q1Pago

println(f"💰 ESTADÍSTICAS DEL CAMPO PAGO:")
println(f"   📏 Tamaño de muestra: $nPago registros")
println(f"   💵 Tendencia central:")
println(f"      • Promedio: S/. $promedioPago%.2f")
println(f"      • Mediana: S/. $medianaPago%.2f")
println(f"   💸 Rango:")
println(f"      • Máximo: S/. $maximoPago%.0f")
println(f"      • Mínimo: S/. $minimoPago%.0f")
println(f"      • Rango total: S/. ${maximoPago - minimoPago}%.0f")
println(f"   📊 Dispersión:")
println(f"      • Desviación estándar: S/. $desviacionEstandarPago%.2f")
println(f"      • Varianza: $varianzaPago%.2f")
println(f"      • Coeficiente de variación: $coeficienteVariacionPago%.2f%%")
println(f"   📦 Cuartiles:")
println(f"      • Q1 (25%%): S/. $q1Pago%.0f")
println(f"      • Q3 (75%%): S/. $q3Pago%.0f")
println(f"      • Rango intercuartílico: S/. $iqrPago%.0f")


💰 ESTADÍSTICAS COMPLETAS: Análisis del campo PAGO

💰 ESTADÍSTICAS DEL CAMPO PAGO:
   📏 Tamaño de muestra: 4878 registros
   💵 Tendencia central:
      • Promedio: S/. 180.97
      • Mediana: S/. 200.00
   💸 Rango:
      • Máximo: S/. 670
      • Mínimo: S/. 50
      • Rango total: S/. 620
   📊 Dispersión:
      • Desviación estándar: S/. 61.72
      • Varianza: 3808.81
      • Coeficiente de variación: 34.10%
   📦 Cuartiles:
      • Q1 (25%): S/. 200
      • Q3 (75%): S/. 200
      • Rango intercuartílico: S/. 0
💰 ESTADÍSTICAS DEL CAMPO PAGO:
   📏 Tamaño de muestra: 4878 registros
   💵 Tendencia central:
      • Promedio: S/. 180.97
      • Mediana: S/. 200.00
   💸 Rango:
      • Máximo: S/. 670
      • Mínimo: S/. 50
      • Rango total: S/. 620
   📊 Dispersión:
      • Desviación estándar: S/. 61.72
      • Varianza: 3808.81
      • Coeficiente de variación: 34.10%
   📦 Cuartiles:
      • Q1 (25%): S/. 200
      • Q3 (75%): S/. 200
      • Rango intercuartílico: S/. 0


[36mestadisticasPago[39m: [32mArray[39m[[32mDouble[39m] = [33mArray[39m(
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m200.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m300.0[39m,
  [32m300.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
  [32m250.0[39m,
...
[36mnPago[39m: [32mInt[39m = [32m4878[39m
[36msumaPago[39m: [32mDouble[39m = [32m882780.0[39m
[36mpromedioPago[39m: [32mDouble[39m = [32m180.97170971709718[39m
[36mmaximoPago

## 🔢 SECCIÓN 4: CINCO CONSULTAS CON DECIMALES Y MAPREDUCE

Esta sección implementa consultas avanzadas que utilizan **múltiples operaciones MapReduce** para calcular ratios, porcentajes, índices estadísticos y métricas complejas.

In [200]:
// CONSULTA DECIMAL 1: Porcentaje de postulantes por sexo y departamento
println("🔢 CONSULTA DECIMAL 1: Distribución porcentual por sexo y departamento")

// Primer MapReduce: Contar total de postulantes
val totalPostulantes = spark.sql("SELECT COUNT(*) as total FROM postulantes")
  .collect()(0).getAs[Long]("total").toDouble

// Segundo MapReduce: Contar por sexo y departamento
val porcentajeSexoDept = spark.sql("SELECT * FROM postulantes").rdd
  .map(row => {
    val sexo = row.getAs[String]("sexo")
    val dept = row.getAs[String]("departamento")
    ((sexo, dept), 1)
  })
  .reduceByKey(_ + _)
  // Tercer MapReduce: Calcular porcentajes
  .map { case ((sexo, dept), count) => 
    val porcentaje = (count.toDouble / totalPostulantes) * 100.0
    val porcentajeRedondeado = BigDecimal(porcentaje).setScale(3, BigDecimal.RoundingMode.HALF_UP).toDouble
    (sexo, dept, count, porcentajeRedondeado)
  }
  .toDF("sexo", "departamento", "cantidad", "porcentaje")
  .orderBy(col("porcentaje").desc)

porcentajeSexoDept.show(15)
// Corregir el error de tipos usando getAs[Double]
val sumaPorcentajes = porcentajeSexoDept.agg(sum(col("porcentaje"))).collect()(0).getAs[Double](0)
println(f"✅ Verificación: Suma de porcentajes = $sumaPorcentajes%.1f%%")

🔢 CONSULTA DECIMAL 1: Distribución porcentual por sexo y departamento

+---------+-------------+--------+----------+
|     sexo| departamento|cantidad|porcentaje|
+---------+-------------+--------+----------+
|MASCULINO|        CUSCO|    2721|    55.215|
| FEMENINO|        CUSCO|    1762|    35.755|
|MASCULINO|     AYACUCHO|      99|     2.009|
| FEMENINO|     AYACUCHO|      53|     1.075|
|MASCULINO|         LIMA|      43|     0.873|
| FEMENINO|         LIMA|      38|     0.771|
|MASCULINO|MADRE DE DIOS|      32|     0.649|
|MASCULINO|         PUNO|      26|     0.528|
|MASCULINO|     APURÍMAC|      20|     0.406|
| FEMENINO|     AREQUIPA|      15|     0.304|
|MASCULINO|     AREQUIPA|      15|     0.304|
| FEMENINO|         PUNO|      14|     0.284|
| FEMENINO|     APURÍMAC|      11|     0.223|
| FEMENINO|      UCAYALI|      10|     0.203|
|MASCULINO|        JUNÍN|      10|     0.203|
+---------+-------------+--------+----------+
only showing top 15 rows

+---------+-------------+----

[36mtotalPostulantes[39m: [32mDouble[39m = [32m4928.0[39m
[36mporcentajeSexoDept[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [sexo: string, departamento: string ... 2 more fields]
[36msumaPorcentajes[39m: [32mDouble[39m = [32m99.99800000000002[39m

In [201]:
// CONSULTA DECIMAL 2: Ratio de pagos promedio entre géneros por escuela
println("\n🔢 CONSULTA DECIMAL 2: Ratio de pagos masculino/femenino por carrera")

// Primer MapReduce: Calcular pagos promedio por género y escuela
val pagosPromedioGenero = spark.sql("SELECT * FROM postulantes WHERE pago IS NOT NULL AND pago > 0").rdd
  .map(row => {
    val escuela = row.getAs[String]("escuela")
    val sexo = row.getAs[String]("sexo")
    val pago = row.getAs[Integer]("pago").toDouble
    ((escuela, sexo), (pago, 1))
  })
  .reduceByKey { case ((pago1, count1), (pago2, count2)) => 
    (pago1 + pago2, count1 + count2) 
  }
  // Segundo MapReduce: Calcular promedios
  .map { case ((escuela, sexo), (sumaPagos, count)) => 
    val promedio = sumaPagos / count
    (escuela, (sexo, promedio))
  }
  // Tercer MapReduce: Agrupar por escuela y calcular ratios
  .groupByKey()
  .map { case (escuela, generos) => 
    val generosMap = generos.toMap
    val masculino = generosMap.getOrElse("MASCULINO", 0.0)
    val femenino = generosMap.getOrElse("FEMENINO", 0.0)
    val ratio = if (femenino > 0) {
      BigDecimal(masculino / femenino).setScale(4, BigDecimal.RoundingMode.HALF_UP).toDouble
    } else 0.0
    val diferencia = masculino - femenino
    (escuela, masculino, femenino, ratio, diferencia)
  }
  .toDF("escuela", "pago_prom_masculino", "pago_prom_femenino", "ratio_m_f", "diferencia_pagos")
  .orderBy(col("ratio_m_f").desc)

pagosPromedioGenero.show(10, truncate = false)
println("📊 Ratio > 1.0 indica que hombres pagan más; Ratio < 1.0 indica que mujeres pagan más")


🔢 CONSULTA DECIMAL 2: Ratio de pagos masculino/femenino por carrera

+------------------------------+-------------------+------------------+---------+-------------------+
|escuela                       |pago_prom_masculino|pago_prom_femenino|ratio_m_f|diferencia_pagos   |
+------------------------------+-------------------+------------------+---------+-------------------+
|ECOTURISMO                    |171.36150234741785 |164.43298969072166|1.0421   |6.928512656696199  |
|INGENIERÍA AGRONÓMICA TROPICAL|175.58539205155748 |171.72413793103448|1.0225   |3.861254120523     |
|CONTABILIDAD                  |207.69230769230768 |203.90625         |1.0186   |3.786057692307679  |
|ECONOMÍA                      |206.14754098360655 |205.88235294117646|1.0013   |0.26518804243008276|
|INGENIERÍA CIVIL              |188.75471698113208 |192.94243070362472|0.9783   |-4.187713722492646 |
|INGENIERÍA DE ALIMENTOS       |168.35443037974684 |173.54916067146283|0.9701   |-5.1947302917159845|
+-----------

[36mpagosPromedioGenero[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [escuela: string, pago_prom_masculino: double ... 3 more fields]

In [202]:
// CONSULTA DECIMAL 3: Índice de diversidad de carreras por departamento (Shannon)
println("\n🔢 CONSULTA DECIMAL 3: Índice de diversidad académica por departamento")

// Primer MapReduce: Contar postulantes por carrera y departamento
val diversidadCarreras = spark.sql("SELECT * FROM postulantes").rdd
  .map(row => {
    val departamento = row.getAs[String]("departamento")
    val escuela = row.getAs[String]("escuela")
    ((departamento, escuela), 1)
  })
  .reduceByKey(_ + _)
  // Segundo MapReduce: Agrupar por departamento
  .map { case ((departamento, escuela), count) => 
    (departamento, (escuela, count)) 
  }
  .groupByKey()
  // Tercer MapReduce: Calcular índice de Shannon
  .map { case (departamento, carreras) => 
    val carrerasList = carreras.toList
    val totalPostulantes = carrerasList.map(_._2).sum.toDouble
    val numeroCarreras = carrerasList.length.toDouble
    
    // Calcular índice de Shannon (diversidad)
    val indiceShannon = carrerasList.map { case (_, count) =>
      val proporcion = count.toDouble / totalPostulantes
      if (proporcion > 0) -proporcion * math.log(proporcion) else 0.0
    }.sum
    
    // Calcular índice de uniformidad (0-1)
    val uniformidad = if (numeroCarreras > 1) {
      indiceShannon / math.log(numeroCarreras)
    } else 0.0
    
    // Redondear valores
    val shannonRedondeado = BigDecimal(indiceShannon).setScale(4, BigDecimal.RoundingMode.HALF_UP).toDouble
    val uniformidadRedondeada = BigDecimal(uniformidad).setScale(4, BigDecimal.RoundingMode.HALF_UP).toDouble
    
    (departamento, numeroCarreras.toInt, totalPostulantes.toInt, shannonRedondeado, uniformidadRedondeada)
  }
  .toDF("departamento", "num_carreras", "total_postulantes", "indice_shannon", "uniformidad")
  .orderBy(col("indice_shannon").desc)

diversidadCarreras.show(10)
println("📈 Índice Shannon: Mayor valor = mayor diversidad académica")
println("📊 Uniformidad: 1.0 = perfecta distribución, 0.0 = concentración total")


🔢 CONSULTA DECIMAL 3: Índice de diversidad académica por departamento

+-------------+------------+-----------------+--------------+-----------+
| departamento|num_carreras|total_postulantes|indice_shannon|uniformidad|
+-------------+------------+-----------------+--------------+-----------+
|         LIMA|           6|               81|         1.482|     0.8271|
|        CUSCO|           6|             4483|        1.4764|      0.824|
|     APURÍMAC|           6|               31|        1.4728|      0.822|
|        JUNÍN|           5|               16|        1.4615|     0.9081|
|      UCAYALI|           5|               19|        1.4122|     0.8775|
|     AYACUCHO|           6|              152|        1.3983|     0.7804|
|         PUNO|           6|               40|        1.3473|     0.7519|
|     AREQUIPA|           6|               30|        1.3449|     0.7506|
|MADRE DE DIOS|           5|               36|        1.2492|     0.7761|
|        TACNA|           3|            

[36mdiversidadCarreras[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [departamento: string, num_carreras: int ... 3 more fields]

In [203]:
// CONSULTA DECIMAL 4: Coeficiente de variación de edades por modalidad
println("\n🔢 CONSULTA DECIMAL 4: Variabilidad de edades por modalidad de ingreso")

// Primer MapReduce: Agrupar edades por modalidad
val coeficienteVariacion = spark.sql("SELECT * FROM postulantes WHERE edad IS NOT NULL").rdd
  .map(row => {
    val modalidad = row.getAs[String]("modalidad")
    val edad = row.getAs[Integer]("edad").toDouble
    (modalidad, edad)
  })
  .groupByKey()
  // Segundo MapReduce: Calcular estadísticas por modalidad
  .map { case (modalidad, edades) => 
    val edadesList = edades.toList
    val n = edadesList.length.toDouble
    val promedio = edadesList.sum / n
    
    // Calcular varianza y desviación estándar
    val varianza = edadesList.map(edad => pow(edad - promedio, 2)).sum / n
    val desviacion = sqrt(varianza)
    
    // Calcular coeficiente de variación (CV)
    val coefVariacion = if (promedio > 0) {
      (desviacion / promedio) * 100.0
    } else 0.0
    
    // Redondear valores
    val promedioRedondeado = BigDecimal(promedio).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble
    val desviacionRedondeada = BigDecimal(desviacion).setScale(3, BigDecimal.RoundingMode.HALF_UP).toDouble
    val cvRedondeado = BigDecimal(coefVariacion).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble
    
    (modalidad, n.toInt, promedioRedondeado, desviacionRedondeada, cvRedondeado)
  }
  .toDF("modalidad", "total_postulantes", "edad_promedio", "desviacion_estandar", "coef_variacion_pct")
  .orderBy(col("coef_variacion_pct").desc)

coeficienteVariacion.show(10, truncate = false)
println("📊 CV < 15%: Baja variabilidad | CV 15-35%: Moderada | CV > 35%: Alta variabilidad")


🔢 CONSULTA DECIMAL 4: Variabilidad de edades por modalidad de ingreso

+-----------------------------------------------------------------+-----------------+-------------+-------------------+------------------+
|modalidad                                                        |total_postulantes|edad_promedio|desviacion_estandar|coef_variacion_pct|
+-----------------------------------------------------------------+-----------------+-------------+-------------------+------------------+
|EXTRAORDINARIO-PERSONAS CON DISCAPACIDAD                         |15               |25.6         |7.126              |27.83             |
|EXTRAORDINARIO-GRADUADOS Y TITULADOS                             |9                |32.44        |7.455              |22.98             |
|ORDINARIO                                                        |2844             |20.78        |4.662              |22.44             |
|EXTRAORDINARIO-COMUNIDADES ANDINO AMAZÓNICOS                     |754              |20.67    

[36mcoeficienteVariacion[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [modalidad: string, total_postulantes: int ... 3 more fields]

In [204]:
// CONSULTA DECIMAL 5: Tasa de conversión académica (postulantes por carrera vs capacidad)
println("\n🔢 CONSULTA DECIMAL 5: Densidad de postulación y competitividad por carrera")

// Simulamos capacidades máximas por carrera (en un caso real vendría de otra tabla)
val capacidadesCarrera = Map(
  "INGENIERÍA CIVIL" -> 150,
  "INGENIERÍA AGRONÓMICA" -> 120,
  "INGENIERÍA DE ALIMENTOS" -> 80,
  "ECOTURISMO" -> 60,
  "ARQUITECTURA" -> 100
)

// Primer MapReduce: Contar postulantes por carrera
val competitividadCarreras = spark.sql("SELECT * FROM postulantes").rdd
  .map(row => {
    val escuela = row.getAs[String]("escuela")
    (escuela, 1)
  })
  .reduceByKey(_ + _)
  // Segundo MapReduce: Calcular métricas de competitividad
  .map { case (escuela, totalPostulantes) => 
    val capacidad = capacidadesCarrera.getOrElse(escuela, 50) // Capacidad por defecto
    
    // Calcular métricas decimales
    val ratioCompetencia = totalPostulantes.toDouble / capacidad.toDouble
    val porcentajeOcupacion = (capacidad.toDouble / totalPostulantes.toDouble) * 100.0
    val indiceSelectividad = if (totalPostulantes > 0) {
      (capacidad.toDouble / totalPostulantes.toDouble) * 100.0
    } else 0.0
    
    // Redondear valores
    val ratioRedondeado = BigDecimal(ratioCompetencia).setScale(3, BigDecimal.RoundingMode.HALF_UP).toDouble
    val ocupacionRedondeada = BigDecimal(porcentajeOcupacion).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble
    val selectividadRedondeada = BigDecimal(indiceSelectividad).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble
    
    (escuela, totalPostulantes, capacidad, ratioRedondeado, ocupacionRedondeada, selectividadRedondeada)
  }
  .toDF("escuela", "total_postulantes", "capacidad_estimada", "ratio_competencia", "pct_ocupacion", "indice_selectividad")
  .orderBy(col("ratio_competencia").desc)

competitividadCarreras.show(15, truncate = false)
println("📊 Ratio Competencia: Postulantes por vacante disponible")
println("📈 % Ocupación: Porcentaje de capacidad utilizada")
println("🎯 Índice Selectividad: % de estudiantes que pueden ser admitidos")


🔢 CONSULTA DECIMAL 5: Densidad de postulación y competitividad por carrera

+------------------------------+-----------------+------------------+-----------------+-------------+-------------------+
|escuela                       |total_postulantes|capacidad_estimada|ratio_competencia|pct_ocupacion|indice_selectividad|
+------------------------------+-----------------+------------------+-----------------+-------------+-------------------+
|INGENIERÍA AGRONÓMICA TROPICAL|1529             |50                |30.58            |3.27         |3.27               |
|INGENIERÍA CIVIL              |1813             |150               |12.087           |8.27         |8.27               |
|INGENIERÍA DE ALIMENTOS       |744              |80                |9.3              |10.75        |10.75              |
|ECOTURISMO                    |506              |60                |8.433            |11.86        |11.86              |
|ECONOMÍA                      |207              |50                |

[36mcapacidadesCarrera[39m: [32mMap[39m[[32mString[39m, [32mInt[39m] = [33mMap[39m(
  [32m"INGENIERÍA DE ALIMENTOS"[39m -> [32m80[39m,
  [32m"ECOTURISMO"[39m -> [32m60[39m,
  [32m"INGENIERÍA CIVIL"[39m -> [32m150[39m,
  [32m"INGENIERÍA AGRONÓMICA"[39m -> [32m120[39m,
  [32m"ARQUITECTURA"[39m -> [32m100[39m
)
[36mcompetitividadCarreras[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [escuela: string, total_postulantes: int ... 4 more fields]

## 🔍 SECCIÓN 5: CONSULTAS BÁSICAS CON SPARK SQL

Implementación de operaciones fundamentales usando DataFrame API y Spark SQL.

In [205]:
// CONSULTA BÁSICA 1: Mostrar columnas específicas
println("🔍 CONSULTA BÁSICA 1: Selección de columnas específicas")

// Seleccionar solo algunas columnas
val columnasEspecificas = spark.sql("SELECT * FROM postulantes")
  .select("uuid", "escuela", "modalidad", "sexo", "edad", "departamento")

columnasEspecificas.show(10)
println(s"📊 Total de registros: ${columnasEspecificas.count()}")

// Alternativa usando DataFrame API
val seleccionColumnas = spark.sql("SELECT * FROM postulantes").select(
  col("uuid"),
  col("escuela"),
  col("modalidad"),
  col("pago"),
  col("edad")
)

println("\n📋 Usando DataFrame API:")
seleccionColumnas.show(5)

🔍 CONSULTA BÁSICA 1: Selección de columnas específicas

+--------------------+--------------------+--------------------+---------+----+------------+
|                uuid|             escuela|           modalidad|     sexo|edad|departamento|
+--------------------+--------------------+--------------------+---------+----+------------+
|eb35ae9c84a9e2393...|    INGENIERÍA CIVIL|PRIMERA OPORTUNID...|MASCULINO|  20|       CUSCO|
|37aa167373b8ddd18...|          ECOTURISMO|PRIMERA OPORTUNID...|MASCULINO|  20|       CUSCO|
|41b4b33d0e004fc7d...|    INGENIERÍA CIVIL|PRIMERA OPORTUNID...|MASCULINO|  19|       CUSCO|
|60d7c1c310363c4b2...|INGENIERÍA AGRONÓ...|PRIMERA OPORTUNID...|MASCULINO|  20|       CUSCO|
|28a9a7dc2ae0a4c9d...|INGENIERÍA DE ALI...|PRIMERA OPORTUNID...| FEMENINO|  20|       CUSCO|
|b0139127f9cb989ac...|    INGENIERÍA CIVIL|PRIMERA OPORTUNID...| FEMENINO|  21|       CUSCO|
|112ebb674090e2d3d...|    INGENIERÍA CIVIL|PRIMERA OPORTUNID...|MASCULINO|  20|       CUSCO|
|90a484b69506d

[36mcolumnasEspecificas[39m: [32mDataFrame[39m = [uuid: string, escuela: string ... 4 more fields]
[36mseleccionColumnas[39m: [32mDataFrame[39m = [uuid: string, escuela: string ... 3 more fields]

In [206]:
// CONSULTA BÁSICA 2: Utilizar el comando filter
println("\n🔍 CONSULTA BÁSICA 2: Aplicando filtros")

// Filtrar postulantes de INGENIERÍA CIVIL con pago mayor a 100
val filtroCarrera = spark.sql("SELECT * FROM postulantes")
  .filter(col("escuela") === "INGENIERÍA CIVIL")
  .filter(col("pago") > 100)
  .filter(col("edad").between(18, 25))

filtroCarrera.show(10)
println(s"📊 Postulantes de Ingeniería Civil (18-25 años, pago > 100): ${filtroCarrera.count()}")

// Filtros múltiples con condiciones complejas
val filtroComplejo = spark.sql("SELECT * FROM postulantes")
  .filter(
    col("sexo") === "FEMENINO" && 
    col("departamento").isin("LIMA", "AREQUIPA", "CUSCO") &&
    col("tiene_discapacidad") === "NO"
  )

println(f"\n👩 Mujeres sin discapacidad de Lima, Arequipa o Cusco: ${filtroComplejo.count()}")
filtroComplejo.select("escuela", "departamento", "modalidad").show(10)


🔍 CONSULTA BÁSICA 2: Aplicando filtros

+-----------+--------------------+----------------+--------------------+-----------------+------+------------+-------------+--------------------+-------------+-------------------+----------------+---------------+---------+---------------+-------------+-----------+----+---------+----+-------+---+-------+-----+------------------+--------------+---------+
|fecha_corte|                uuid|         escuela|           modalidad|annio_postulacion|ubigeo|departamento|    provincia|            distrito|ubigeo_origen|departamento_origen|provincia_origen|distrito_origen|ubigeo_ie|departamento_ie| provincia_ie|distrito_ie|edad|     sexo|pago| idioma|lee|escribe|habla|tiene_discapacidad|tipo_comunidad|comunidad|
+-----------+--------------------+----------------+--------------------+-----------------+------+------------+-------------+--------------------+-------------+-------------------+----------------+---------------+---------+---------------+-----------

[36mfiltroCarrera[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [fecha_corte: int, uuid: string ... 25 more fields]
[36mfiltroComplejo[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [fecha_corte: int, uuid: string ... 25 more fields]

In [207]:
// CONSULTA BÁSICA 3: Mostrar información ordenada
println("\n🔍 CONSULTA BÁSICA 3: Información ordenada")

// Ordenar por múltiples columnas
val datosOrdenados = spark.sql("SELECT * FROM postulantes")
  .select("uuid", "escuela", "edad", "pago", "departamento")
  .orderBy(col("pago").desc, col("edad").asc)

println("📊 Postulantes ordenados por pago (desc) y edad (asc):")
datosOrdenados.show(15)

// Ordenar por departamento y mostrar estadísticas
val ordenadoPorRegion = spark.sql("SELECT * FROM postulantes")
  .select("departamento", "escuela", "sexo", "modalidad")
  .orderBy(col("departamento").asc, col("escuela").asc)

println("\n🌍 Ordenado por región y carrera:")
ordenadoPorRegion.show(10)


🔍 CONSULTA BÁSICA 3: Información ordenada
📊 Postulantes ordenados por pago (desc) y edad (asc):

📊 Postulantes ordenados por pago (desc) y edad (asc):
+--------------------+--------------------+----+----+------------+
|                uuid|             escuela|edad|pago|departamento|
+--------------------+--------------------+----+----+------------+
|10c78e221144ae1a4...|INGENIERÍA AGRONÓ...|  23| 670|       CUSCO|
|a2b9bb2f6cada7b14...|INGENIERÍA DE ALI...|  24| 670|       CUSCO|
|ad73d49724aff6044...|    INGENIERÍA CIVIL|  24| 670|       CUSCO|
|9ccd26197f39a7bd5...|    INGENIERÍA CIVIL|  30| 670|       CUSCO|
|9955eb96199807478...|    INGENIERÍA CIVIL|  31| 670|       CUSCO|
|b1ce2631353d05962...|    INGENIERÍA CIVIL|  36| 670|       CUSCO|
|47a8d23129f81caf5...|    INGENIERÍA CIVIL|  40| 670|    AYACUCHO|
|ac6f6a84abb226401...|    INGENIERÍA CIVIL|  40| 670|    APURÍMAC|
|4c7d00d8349d30f5d...|    INGENIERÍA CIVIL|  44| 670|       CUSCO|
|dd5ed3ed82dc53a81...|    INGENIERÍA CIVIL| 

[36mdatosOrdenados[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [uuid: string, escuela: string ... 3 more fields]
[36mordenadoPorRegion[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [departamento: string, escuela: string ... 2 more fields]

In [208]:
// CONSULTA BÁSICA 4: Utilizar groupBy y count
println("\n🔍 CONSULTA BÁSICA 4: Agrupaciones con count")

// Contar postulantes por carrera
val conteoCarreras = spark.sql("SELECT * FROM postulantes")
  .groupBy("escuela")
  .count()
  .orderBy(col("count").desc)

println("📊 Postulantes por carrera:")
conteoCarreras.show()

// Agrupación múltiple: por departamento y sexo
val conteoMultiple = spark.sql("SELECT * FROM postulantes")
  .groupBy("departamento", "sexo")
  .count()
  .orderBy(col("departamento").asc, col("count").desc)

println("\n🏛️ Postulantes por departamento y sexo:")
conteoMultiple.show(15)

// Agrupación con filtro previo
val conteoFiltrado = spark.sql("SELECT * FROM postulantes")
  .filter(col("edad").between(18, 22))
  .groupBy("modalidad")
  .count()
  .orderBy(col("count").desc)

println("\n🎓 Jóvenes (18-22 años) por modalidad:")
conteoFiltrado.show()


🔍 CONSULTA BÁSICA 4: Agrupaciones con count

📊 Postulantes por carrera:
📊 Postulantes por carrera:
+--------------------+-----+
|             escuela|count|
+--------------------+-----+
|    INGENIERÍA CIVIL| 1813|
|INGENIERÍA AGRONÓ...| 1529|
|INGENIERÍA DE ALI...|  744|
|          ECOTURISMO|  506|
|            ECONOMÍA|  207|
|        CONTABILIDAD|  129|
+--------------------+-----+


🏛️ Postulantes por departamento y sexo:
+--------------------+-----+
|             escuela|count|
+--------------------+-----+
|    INGENIERÍA CIVIL| 1813|
|INGENIERÍA AGRONÓ...| 1529|
|INGENIERÍA DE ALI...|  744|
|          ECOTURISMO|  506|
|            ECONOMÍA|  207|
|        CONTABILIDAD|  129|
+--------------------+-----+


🏛️ Postulantes por departamento y sexo:
+------------+---------+-----+
|departamento|     sexo|count|
+------------+---------+-----+
|    APURÍMAC|MASCULINO|   20|
|    APURÍMAC| FEMENINO|   11|
|    AREQUIPA| FEMENINO|   15|
|    AREQUIPA|MASCULINO|   15|
|    AYACUCHO|MASCU

[36mconteoCarreras[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [escuela: string, count: bigint]
[36mconteoMultiple[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [departamento: string, sexo: string ... 1 more field]
[36mconteoFiltrado[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [modalidad: string, count: bigint]

In [209]:
// CONSULTA BÁSICA 5: Promedio de una columna
println("\n🔍 CONSULTA BÁSICA 5: Promedio de columnas numéricas")

// Promedio de edad por carrera
val promedioEdadCarrera = spark.sql("SELECT * FROM postulantes")
  .filter(col("edad").isNotNull)
  .groupBy("escuela")
  .agg(avg("edad").alias("edad_promedio"))
  .orderBy(col("edad_promedio").desc)

println("📊 Edad promedio por carrera:")
promedioEdadCarrera.show()

// Promedio de pagos por departamento
val promedioPagoDept = spark.sql("SELECT * FROM postulantes")
  .filter(col("pago").isNotNull && col("pago") > 0)
  .groupBy("departamento")
  .agg(
    avg("pago").alias("pago_promedio"),
    count("*").alias("total_postulantes")
  )
  .orderBy(col("pago_promedio").desc)

println("\n💰 Pago promedio por departamento:")
promedioPagoDept.show()

// Múltiples promedios
val estadisticasGenerales = spark.sql("SELECT * FROM postulantes")
  .filter(col("edad").isNotNull && col("pago").isNotNull && col("pago") > 0)
  .agg(
    avg("edad").alias("edad_promedio_general"),
    avg("pago").alias("pago_promedio_general"),
    count("*").alias("total_registros")
  )

println("\n📈 Estadísticas generales:")
estadisticasGenerales.show()


🔍 CONSULTA BÁSICA 5: Promedio de columnas numéricas
📊 Edad promedio por carrera:

📊 Edad promedio por carrera:
+--------------------+------------------+
|             escuela|     edad_promedio|
+--------------------+------------------+
|            ECONOMÍA|20.502415458937197|
|    INGENIERÍA CIVIL| 20.42746828461114|
|INGENIERÍA AGRONÓ...|20.199476782210596|
|INGENIERÍA DE ALI...|20.052419354838708|
|          ECOTURISMO|20.033596837944664|
|        CONTABILIDAD|19.434108527131784|
+--------------------+------------------+

+--------------------+------------------+
|             escuela|     edad_promedio|
+--------------------+------------------+
|            ECONOMÍA|20.502415458937197|
|    INGENIERÍA CIVIL| 20.42746828461114|
|INGENIERÍA AGRONÓ...|20.199476782210596|
|INGENIERÍA DE ALI...|20.052419354838708|
|          ECOTURISMO|20.033596837944664|
|        CONTABILIDAD|19.434108527131784|
+--------------------+------------------+


💰 Pago promedio por departamento:

💰 Pago pro

[36mpromedioEdadCarrera[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [escuela: string, edad_promedio: double]
[36mpromedioPagoDept[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [departamento: string, pago_promedio: double ... 1 more field]
[36mestadisticasGenerales[39m: [32mDataFrame[39m = [edad_promedio_general: double, pago_promedio_general: double ... 1 more field]

## 🔗 SECCIÓN 6: CINCO CONSULTAS CON JOIN

Consultas utilizando diferentes tipos de JOIN para combinar datos. Como trabajamos con una sola tabla, crearemos vistas derivadas para demostrar las operaciones JOIN.

In [210]:
// PREPARACIÓN: Crear tablas derivadas para JOIN
println("🔧 PREPARACIÓN: Creando vistas derivadas para consultas JOIN")

// Vista 1: Resumen por carreras
val resumenCarreras = spark.sql("SELECT * FROM postulantes")
  .groupBy("escuela")
  .agg(
    count("*").alias("total_postulantes"),
    avg("edad").alias("edad_promedio"),
    avg("pago").alias("pago_promedio")
  )

resumenCarreras.createOrReplaceTempView("resumen_carreras")

// Vista 2: Resumen por departamentos
val resumenDepartamentos = spark.sql("SELECT * FROM postulantes")
  .groupBy("departamento")
  .agg(
    count("*").alias("total_estudiantes"),
    countDistinct("escuela").alias("carreras_ofertadas")
  )

resumenDepartamentos.createOrReplaceTempView("resumen_departamentos")

// Vista 3: Postulantes destacados (pago alto)
val postulantesDestacados = spark.sql("SELECT * FROM postulantes")
  .filter(col("pago") > 200)
  .select("uuid", "escuela", "departamento", "pago", "edad")

postulantesDestacados.createOrReplaceTempView("postulantes_destacados")

// Vista 4: Distribución por sexo y modalidad
val distribucionSexoModalidad = spark.sql("SELECT * FROM postulantes")
  .groupBy("sexo", "modalidad")
  .agg(count("*").alias("cantidad"))

distribucionSexoModalidad.createOrReplaceTempView("distribucion_sexo_modalidad")

println("✅ Vistas preparadas para consultas JOIN")

🔧 PREPARACIÓN: Creando vistas derivadas para consultas JOIN

✅ Vistas preparadas para consultas JOIN
✅ Vistas preparadas para consultas JOIN


[36mresumenCarreras[39m: [32mDataFrame[39m = [escuela: string, total_postulantes: bigint ... 2 more fields]
[36mresumenDepartamentos[39m: [32mDataFrame[39m = [departamento: string, total_estudiantes: bigint ... 1 more field]
[36mpostulantesDestacados[39m: [32mDataFrame[39m = [uuid: string, escuela: string ... 3 more fields]
[36mdistribucionSexoModalidad[39m: [32mDataFrame[39m = [sexo: string, modalidad: string ... 1 more field]

In [211]:
// JOIN 1: Inner Join - Postulantes con estadísticas de su carrera
println("\n🔗 JOIN 1: Inner Join - Postulantes con estadísticas de carrera")

val join1 = spark.sql("SELECT * FROM postulantes").as("p")
  .join(
    resumenCarreras.as("rc"),
    col("p.escuela") === col("rc.escuela"),
    "inner"
  )
  .select(
    col("p.uuid"),
    col("p.escuela"),
    col("p.edad"),
    col("p.pago"),
    col("rc.total_postulantes"),
    col("rc.edad_promedio"),
    col("rc.pago_promedio")
  )
  .orderBy(col("rc.total_postulantes").desc)

join1.show(10)
println(s"📊 Registros con información de carrera: ${join1.count()}")


🔗 JOIN 1: Inner Join - Postulantes con estadísticas de carrera

+--------------------+----------------+----+----+-----------------+-----------------+-----------------+
|                uuid|         escuela|edad|pago|total_postulantes|    edad_promedio|    pago_promedio|
+--------------------+----------------+----+----+-----------------+-----------------+-----------------+
|eb35ae9c84a9e2393...|INGENIERÍA CIVIL|  20| 250|             1813|20.42746828461114|187.8599007170436|
|41b4b33d0e004fc7d...|INGENIERÍA CIVIL|  19| 250|             1813|20.42746828461114|187.8599007170436|
|b0139127f9cb989ac...|INGENIERÍA CIVIL|  21| 250|             1813|20.42746828461114|187.8599007170436|
|112ebb674090e2d3d...|INGENIERÍA CIVIL|  20| 250|             1813|20.42746828461114|187.8599007170436|
|90a484b69506d0ad3...|INGENIERÍA CIVIL|  20| 250|             1813|20.42746828461114|187.8599007170436|
|38c975de6a895f557...|INGENIERÍA CIVIL|  20| 250|             1813|20.42746828461114|187.8599007170436|

[36mjoin1[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [uuid: string, escuela: string ... 5 more fields]

In [212]:
// JOIN 2: Left Join - Todos los postulantes con info de departamento
println("\n🔗 JOIN 2: Left Join - Postulantes con estadísticas departamentales")

val join2 = spark.sql("SELECT * FROM postulantes").as("p")
  .join(
    resumenDepartamentos.as("rd"),
    col("p.departamento") === col("rd.departamento"),
    "left"
  )
  .select(
    col("p.uuid"),
    col("p.departamento"),
    col("p.escuela"),
    col("rd.total_estudiantes"),
    col("rd.carreras_ofertadas")
  )
  .filter(col("rd.total_estudiantes").isNotNull)
  .orderBy(col("rd.total_estudiantes").desc)

join2.show(10)
println(s"🌍 Postulantes con info departamental: ${join2.count()}")


🔗 JOIN 2: Left Join - Postulantes con estadísticas departamentales

+--------------------+------------+--------------------+-----------------+------------------+
|                uuid|departamento|             escuela|total_estudiantes|carreras_ofertadas|
+--------------------+------------+--------------------+-----------------+------------------+
|eb35ae9c84a9e2393...|       CUSCO|    INGENIERÍA CIVIL|             4483|                 6|
|b3505291148cdbfcf...|       CUSCO|    INGENIERÍA CIVIL|             4483|                 6|
|37aa167373b8ddd18...|       CUSCO|          ECOTURISMO|             4483|                 6|
|41b4b33d0e004fc7d...|       CUSCO|    INGENIERÍA CIVIL|             4483|                 6|
|60d7c1c310363c4b2...|       CUSCO|INGENIERÍA AGRONÓ...|             4483|                 6|
|28a9a7dc2ae0a4c9d...|       CUSCO|INGENIERÍA DE ALI...|             4483|                 6|
|b0139127f9cb989ac...|       CUSCO|    INGENIERÍA CIVIL|             4483|           

[36mjoin2[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [uuid: string, departamento: string ... 3 more fields]

In [213]:
// JOIN 3: Right Join - Solo postulantes destacados con info completa
println("\n🔗 JOIN 3: Right Join - Postulantes destacados con contexto")

val join3 = resumenCarreras.as("rc")
  .join(
    postulantesDestacados.as("pd"),
    col("rc.escuela") === col("pd.escuela"),
    "right"
  )
  .select(
    col("pd.uuid"),
    col("pd.escuela"),
    col("pd.pago"),
    col("pd.edad"),
    col("rc.total_postulantes"),
    col("rc.pago_promedio")
  )
  .orderBy(col("pd.pago").desc)

join3.show(10)
println(s"⭐ Postulantes destacados con contexto: ${join3.count()}")


🔗 JOIN 3: Right Join - Postulantes destacados con contexto

+--------------------+--------------------+----+----+-----------------+------------------+
|                uuid|             escuela|pago|edad|total_postulantes|     pago_promedio|
+--------------------+--------------------+----+----+-----------------+------------------+
|47a8d23129f81caf5...|    INGENIERÍA CIVIL| 670|  40|             1813| 187.8599007170436|
|b1ce2631353d05962...|    INGENIERÍA CIVIL| 670|  36|             1813| 187.8599007170436|
|9955eb96199807478...|    INGENIERÍA CIVIL| 670|  31|             1813| 187.8599007170436|
|4c7d00d8349d30f5d...|    INGENIERÍA CIVIL| 670|  44|             1813| 187.8599007170436|
|ac6f6a84abb226401...|    INGENIERÍA CIVIL| 670|  40|             1813| 187.8599007170436|
|9ccd26197f39a7bd5...|    INGENIERÍA CIVIL| 670|  30|             1813| 187.8599007170436|
|ad73d49724aff6044...|    INGENIERÍA CIVIL| 670|  24|             1813| 187.8599007170436|
|10c78e221144ae1a4...|INGENIE

[36mjoin3[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [uuid: string, escuela: string ... 4 more fields]

In [214]:
// JOIN 4: Full Outer Join - Comparación completa entre dos vistas
println("\n🔗 JOIN 4: Full Outer Join - Comparación completa de datos")

// Crear vista adicional para el full outer join
val postulantesCarrera = spark.sql("SELECT * FROM postulantes")
  .groupBy("escuela")
  .agg(
    sum(when(col("sexo") === "MASCULINO", 1).otherwise(0)).alias("hombres"),
    sum(when(col("sexo") === "FEMENINO", 1).otherwise(0)).alias("mujeres")
  )

val join4 = resumenCarreras.as("rc")
  .join(
    postulantesCarrera.as("pc"),
    col("rc.escuela") === col("pc.escuela"),
    "full_outer"
  )
  .select(
    coalesce(col("rc.escuela"), col("pc.escuela")).alias("carrera"),
    col("rc.total_postulantes"),
    col("pc.hombres"),
    col("pc.mujeres"),
    col("rc.edad_promedio")
  )
  .orderBy(col("total_postulantes").desc_nulls_last)

join4.show(10)
println(s"🔄 Registros en comparación completa: ${join4.count()}")


🔗 JOIN 4: Full Outer Join - Comparación completa de datos

+--------------------+-----------------+-------+-------+------------------+
|             carrera|total_postulantes|hombres|mujeres|     edad_promedio|
+--------------------+-----------------+-------+-------+------------------+
|    INGENIERÍA CIVIL|             1813|   1339|    474| 20.42746828461114|
|INGENIERÍA AGRONÓ...|             1529|    941|    588|20.199476782210596|
|INGENIERÍA DE ALI...|              744|    319|    425|20.052419354838708|
|          ECOTURISMO|              506|    214|    292|20.033596837944664|
|            ECONOMÍA|              207|    122|     85|20.502415458937197|
|        CONTABILIDAD|              129|     65|     64|19.434108527131784|
+--------------------+-----------------+-------+-------+------------------+

+--------------------+-----------------+-------+-------+------------------+
|             carrera|total_postulantes|hombres|mujeres|     edad_promedio|
+--------------------+-----

[36mpostulantesCarrera[39m: [32mDataFrame[39m = [escuela: string, hombres: bigint ... 1 more field]
[36mjoin4[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [carrera: string, total_postulantes: bigint ... 3 more fields]

In [215]:
// JOIN 5: Cross Join (limitado) - Combinaciones de modalidad y sexo
println("\n🔗 JOIN 5: Cross Join - Análisis de combinaciones")

// Obtener modalidades únicas
val modalidadesUnicas = spark.sql("SELECT DISTINCT modalidad FROM postulantes")
modalidadesUnicas.createOrReplaceTempView("modalidades_unicas")

// Obtener sexos únicos 
val sexosUnicos = spark.sql("SELECT DISTINCT sexo FROM postulantes")
sexosUnicos.createOrReplaceTempView("sexos_unicos")

// Cross join limitado con agregación posterior
val join5 = modalidadesUnicas.as("m")
  .crossJoin(sexosUnicos.as("s"))
  .join(
    distribucionSexoModalidad.as("dsm"),
    col("m.modalidad") === col("dsm.modalidad") && 
    col("s.sexo") === col("dsm.sexo"),
    "left"
  )
  .select(
    col("m.modalidad"),
    col("s.sexo"),
    coalesce(col("dsm.cantidad"), lit(0)).alias("cantidad")
  )
  .orderBy(col("modalidad"), col("sexo"))

join5.show()
println(s"❌ Todas las combinaciones posibles: ${join5.count()}")
println("Nota: Cross join muestra todas las combinaciones posibles de modalidad x sexo")


🔗 JOIN 5: Cross Join - Análisis de combinaciones

+--------------------+---------+--------+
|           modalidad|     sexo|cantidad|
+--------------------+---------+--------+
|             BECA 18| FEMENINO|      28|
|             BECA 18|MASCULINO|      10|
|EXTRAORDINARIO-CO...| FEMENINO|     308|
|EXTRAORDINARIO-CO...|MASCULINO|     446|
|EXTRAORDINARIO-GR...| FEMENINO|       3|
|EXTRAORDINARIO-GR...|MASCULINO|       6|
|EXTRAORDINARIO-PE...| FEMENINO|       2|
|EXTRAORDINARIO-PE...|MASCULINO|      13|
|EXTRAORDINARIO-PR...| FEMENINO|      55|
|EXTRAORDINARIO-PR...|MASCULINO|      55|
|EXTRAORDINARIO-VI...| FEMENINO|      24|
|EXTRAORDINARIO-VI...|MASCULINO|      41|
|           ORDINARIO| FEMENINO|    1027|
|           ORDINARIO|MASCULINO|    1817|
|PRIMERA OPORTUNID...| FEMENINO|     481|
|PRIMERA OPORTUNID...|MASCULINO|     612|
+--------------------+---------+--------+

+--------------------+---------+--------+
|           modalidad|     sexo|cantidad|
+--------------------+--

[36mmodalidadesUnicas[39m: [32mDataFrame[39m = [modalidad: string]
[36msexosUnicos[39m: [32mDataFrame[39m = [sexo: string]
[36mjoin5[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [modalidad: string, sexo: string ... 1 more field]

## ⚡ SECCIÓN 7: CINCO CONSULTAS USANDO FUNCIONES DE SPARK

Consultas utilizando funciones avanzadas de `org.apache.spark.sql.functions._` para manipulación y análisis de datos.

In [216]:
// FUNCIÓN 1: Funciones de cadena y manipulación de texto
println("⚡ FUNCIÓN 1: Manipulación de cadenas y texto")

val funcionesCadena = spark.sql("SELECT * FROM postulantes")
  .select(
    col("uuid"),
    col("escuela"),
    // Funciones de cadena
    upper(col("escuela")).alias("escuela_mayuscula"),
    lower(col("departamento")).alias("departamento_minuscula"),
    length(col("escuela")).alias("longitud_carrera"),
    substring(col("uuid"), 1, 4).alias("prefijo_uuid"),
    concat(col("departamento"), lit(" - "), col("escuela")).alias("ubicacion_carrera"),
    // Funciones condicionales
    when(col("sexo") === "MASCULINO", "M")
      .when(col("sexo") === "FEMENINO", "F")
      .otherwise("N/A").alias("sexo_abrev"),
    // Funciones de reemplazo
    regexp_replace(col("escuela"), "INGENIERÍA", "ING.").alias("carrera_abrev")
  )
  .filter(col("longitud_carrera") > 10)
  .orderBy(col("longitud_carrera").desc)

funcionesCadena.show(10, truncate = false)
println(s"📝 Registros procesados con funciones de cadena: ${funcionesCadena.count()}")

⚡ FUNCIÓN 1: Manipulación de cadenas y texto

+----------------------------------------------------------------+------------------------------+------------------------------+----------------------+----------------+------------+--------------------------------------+----------+------------------------+
|uuid                                                            |escuela                       |escuela_mayuscula             |departamento_minuscula|longitud_carrera|prefijo_uuid|ubicacion_carrera                     |sexo_abrev|carrera_abrev           |
+----------------------------------------------------------------+------------------------------+------------------------------+----------------------+----------------+------------+--------------------------------------+----------+------------------------+
|ca3c1c3907c5bdf3e0361051164a46705045cd507d7fe88dfb9e4dffd7da3343|INGENIERÍA AGRONÓMICA TROPICAL|INGENIERÍA AGRONÓMICA TROPICAL|cusco                 |30              |ca3c        |CU

[36mfuncionesCadena[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [uuid: string, escuela: string ... 7 more fields]

In [217]:
// FUNCIÓN 2: Funciones matemáticas y estadísticas
println("\n⚡ FUNCIÓN 2: Funciones matemáticas y estadísticas")

import org.apache.spark.sql.expressions.Window

val funcionesMatematicas = spark.sql("""
  SELECT 
    uuid,
    edad,
    pago,
    -- Funciones matemáticas
    ROUND(pago / 10.0, 2) as pago_decenas,
    SQRT(pago) as raiz_pago,
    POWER(edad, 2) as edad_cuadrado,
    ABS(edad - 20) as diferencia_edad_20,
    -- Funciones de agregación en ventana
    LOG(pago + 1) as log_pago,
    EXP(edad / 100.0) as exp_edad,
    -- Rangos y percentiles
    NTILE(4) OVER (ORDER BY pago) as cuartil_pago,
    PERCENT_RANK() OVER (ORDER BY edad) as percentil_edad
  FROM postulantes 
  WHERE edad IS NOT NULL AND pago IS NOT NULL AND pago > 0
  ORDER BY pago DESC
""")

funcionesMatematicas.show(10)
println(s"🔢 Registros con cálculos matemáticos: ${funcionesMatematicas.count()}")


⚡ FUNCIÓN 2: Funciones matemáticas y estadísticas

+--------------------+----+----+------------+------------------+-------------+------------------+-----------------+------------------+------------+-------------------+
|                uuid|edad|pago|pago_decenas|         raiz_pago|edad_cuadrado|diferencia_edad_20|         log_pago|          exp_edad|cuartil_pago|     percentil_edad|
+--------------------+----+----+------------+------------------+-------------+------------------+-----------------+------------------+------------+-------------------+
|47a8d23129f81caf5...|  40| 670|       67.00| 25.88435821108957|       1600.0|                20|6.508769136971682|1.4918246976412703|           4| 0.9938486774656551|
|10c78e221144ae1a4...|  23| 670|       67.00| 25.88435821108957|        529.0|                 3|6.508769136971682|1.2586000099294778|           4|  0.834939511995079|
|a2b9bb2f6cada7b14...|  24| 670|       67.00| 25.88435821108957|        576.0|                 4|6.508769136

[32mimport [39m[36morg.apache.spark.sql.expressions.Window[39m
[36mfuncionesMatematicas[39m: [32mDataFrame[39m = [uuid: string, edad: int ... 9 more fields]

In [218]:
// FUNCIÓN 3: Funciones de fecha y tiempo
println("\n⚡ FUNCIÓN 3: Funciones de fecha y tiempo")

val funcionesFecha = spark.sql("SELECT * FROM postulantes")
  .select(
    col("uuid"),
    col("annio_postulacion"),
    col("edad"),
    // Crear fecha de nacimiento estimada
    (col("annio_postulacion") - col("edad")).alias("anio_nacimiento_estimado"),
    // Funciones de fecha
    current_date().alias("fecha_actual"),
    current_timestamp().alias("timestamp_actual"),
    // Fecha de postulación estimada (asumiendo enero)
    to_date(concat(col("annio_postulacion"), lit("-01-01"))).alias("fecha_postulacion"),
    // Calcular diferencia con fecha actual
    year(current_date()).alias("anio_actual"),
    (year(current_date()) - col("annio_postulacion")).alias("anios_desde_postulacion"),
    // Funciones de formato
    date_format(current_date(), "dd/MM/yyyy").alias("fecha_formateada"),
    // Categorizar por década de nacimiento
    when(col("annio_postulacion") - col("edad") >= 2000, "2000s")
      .when(col("annio_postulacion") - col("edad") >= 1990, "1990s")
      .when(col("annio_postulacion") - col("edad") >= 1980, "1980s")
      .otherwise("Anterior").alias("decada_nacimiento")
  )
  .filter(col("edad").isNotNull)
  .orderBy(col("anio_nacimiento_estimado").desc)

funcionesFecha.show(10)
println(s"📅 Registros con información temporal: ${funcionesFecha.count()}")


⚡ FUNCIÓN 3: Funciones de fecha y tiempo

+--------------------+-----------------+----+------------------------+------------+--------------------+-----------------+-----------+-----------------------+----------------+-----------------+
|                uuid|annio_postulacion|edad|anio_nacimiento_estimado|fecha_actual|    timestamp_actual|fecha_postulacion|anio_actual|anios_desde_postulacion|fecha_formateada|decada_nacimiento|
+--------------------+-----------------+----+------------------------+------------+--------------------+-----------------+-----------+-----------------------+----------------+-----------------+
|8e9fada267e9d84e1...|             2023|  16|                    2007|  2025-06-09|2025-06-09 23:22:...|       2023-01-01|       2025|                      2|      09/06/2025|            2000s|
|0ea9b8ce3a8b82f1e...|             2023|  16|                    2007|  2025-06-09|2025-06-09 23:22:...|       2023-01-01|       2025|                      2|      09/06/2025|      

[36mfuncionesFecha[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [uuid: string, annio_postulacion: int ... 9 more fields]

In [219]:
// FUNCIÓN 4: Funciones de agregación y ventana
println("\n⚡ FUNCIÓN 4: Funciones de agregación y ventana")

import org.apache.spark.sql.expressions.Window

// Definir ventanas
val ventanaDepartamento = Window.partitionBy("departamento").orderBy(col("pago").desc)
val ventanaCarrera = Window.partitionBy("escuela")
val ventanaGeneral = Window.orderBy(col("pago").desc)

val funcionesVentana = spark.sql("SELECT * FROM postulantes")
  .filter(col("pago").isNotNull && col("pago") > 0)
  .select(
    col("uuid"),
    col("departamento"),
    col("escuela"),
    col("pago"),
    col("edad"),
    // Funciones de ranking
    row_number().over(ventanaDepartamento).alias("ranking_pago_dept"),
    rank().over(ventanaCarrera.orderBy(col("pago").desc)).alias("rank_pago_carrera"),
    dense_rank().over(ventanaGeneral).alias("dense_rank_general"),
    // Funciones de agregación en ventana
    sum(col("pago")).over(ventanaCarrera).alias("total_pago_carrera"),
    avg(col("edad")).over(ventanaDepartamento).alias("edad_prom_dept"),
    max(col("pago")).over(ventanaCarrera).alias("pago_max_carrera"),
    min(col("pago")).over(ventanaCarrera).alias("pago_min_carrera"),
    // Funciones de desplazamiento
    lag(col("pago"), 1).over(ventanaDepartamento).alias("pago_anterior"),
    lead(col("pago"), 1).over(ventanaDepartamento).alias("pago_siguiente"),
    // Percentiles
    percent_rank().over(ventanaGeneral).alias("percentil_pago")
  )
  .orderBy(col("pago").desc)

funcionesVentana.show(15)
println(s"🎥 Registros con funciones de ventana: ${funcionesVentana.count()}")


⚡ FUNCIÓN 4: Funciones de agregación y ventana

+--------------------+------------+--------------------+----+----+-----------------+-----------------+------------------+------------------+------------------+----------------+----------------+-------------+--------------+--------------------+
|                uuid|departamento|             escuela|pago|edad|ranking_pago_dept|rank_pago_carrera|dense_rank_general|total_pago_carrera|    edad_prom_dept|pago_max_carrera|pago_min_carrera|pago_anterior|pago_siguiente|      percentil_pago|
+--------------------+------------+--------------------+----+----+-----------------+-----------------+------------------+------------------+------------------+----------------+----------------+-------------+--------------+--------------------+
|10c78e221144ae1a4...|       CUSCO|INGENIERÍA AGRONÓ...| 670|  23|                1|                1|                 1|            263070|30.285714285714285|             670|              50|         NULL|           6

[32mimport [39m[36morg.apache.spark.sql.expressions.Window[39m
[36mventanaDepartamento[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mexpressions[39m.[32mWindowSpec[39m = org.apache.spark.sql.expressions.WindowSpec@2bbab9bf
[36mventanaCarrera[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mexpressions[39m.[32mWindowSpec[39m = org.apache.spark.sql.expressions.WindowSpec@3ae351a3
[36mventanaGeneral[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mexpressions[39m.[32mWindowSpec[39m = org.apache.spark.sql.expressions.WindowSpec@1851eddd
[36mfuncionesVentana[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [uuid: string, departamento: string ... 13 more fields]

In [220]:
// FUNCIÓN 5: Funciones de colección y estructuras complejas
println("\n⚡ FUNCIÓN 5: Funciones de colección y estructuras")

val funcionesColeccion = spark.sql("SELECT * FROM postulantes")
  .select(
    col("uuid"),
    col("departamento"),
    col("escuela"),
    col("modalidad"),
    col("sexo"),
    col("pago"),
    // Crear arrays y estructuras
    array(col("departamento"), col("escuela")).alias("ubicacion_array"),
    struct(
      col("sexo").alias("genero"),
      col("edad").alias("años"),
      col("modalidad").alias("tipo_ingreso")
    ).alias("perfil_estudiante"),
    // Funciones de mapeo
    map(
      lit("departamento"), col("departamento"),
      lit("carrera"), col("escuela"),
      lit("pago"), col("pago").cast("string")
    ).alias("datos_mapa"),
    // Crear JSON
    to_json(
      struct(
        col("uuid").alias("id"),
        col("escuela").alias("career"),
        col("pago").alias("payment")
      )
    ).alias("datos_json"),
    // Funciones de hash
    hash(col("uuid"), col("escuela")).alias("hash_unico"),
    md5(concat(col("uuid"), col("departamento"))).alias("md5_identificador"),
    // Funciones de conversión
    col("pago").cast("double").alias("pago_double"),
    coalesce(col("pago"), lit(0)).alias("pago_sin_null")
  )
  .filter(col("pago").isNotNull)
  .orderBy(col("hash_unico").desc)

funcionesColeccion.show(10, truncate = false)
println(s"📦 Registros con estructuras complejas: ${funcionesColeccion.count()}")

// Mostrar algunos campos específicos para mejor visualización
println("\n🔍 Ejemplos de estructuras creadas:")
funcionesColeccion.select("uuid", "ubicacion_array", "datos_json").show(5, truncate = false)


⚡ FUNCIÓN 5: Funciones de colección y estructuras

+----------------------------------------------------------------+------------+------------------------------+-----------------------------------------------------------------+---------+----+---------------------------------------+----------------------------------------------------------------------------------+-------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------+----------+--------------------------------+-----------+-------------+
|uuid                                                            |departamento|escuela                       |modalidad                                                        |sexo     |pago|ubicacion_array                        |perfil_estudiante                                                                 |datos_mapa                             

[36mfuncionesColeccion[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [uuid: string, departamento: string ... 12 more fields]

## 📊 SECCIÓN 8: VISTAS TEMPORALES Y SQL PURO

Consultas utilizando **vistas temporales** y **SQL puro** para realizar operaciones complejas con joins, agrupaciones y ordenamientos.

In [221]:
// PREPARACIÓN: Crear vistas temporales adicionales para SQL
println("📋 PREPARACIÓN: Creando vistas temporales para SQL puro")

// Vista temporal para análisis geográfico
spark.sql("""
  CREATE OR REPLACE TEMPORARY VIEW vista_geografica AS
  SELECT 
    departamento,
    provincia,
    distrito_origen,
    COUNT(*) as total_postulantes,
    AVG(edad) as edad_promedio,
    SUM(CASE WHEN sexo = 'MASCULINO' THEN 1 ELSE 0 END) as hombres,
    SUM(CASE WHEN sexo = 'FEMENINO' THEN 1 ELSE 0 END) as mujeres
  FROM postulantes 
  GROUP BY departamento, provincia, distrito_origen
""")

// Vista temporal para análisis académico
spark.sql("""
  CREATE OR REPLACE TEMPORARY VIEW vista_academica AS
  SELECT 
    escuela,
    modalidad,
    COUNT(*) as cantidad_postulantes,
    AVG(pago) as pago_promedio,
    MAX(pago) as pago_maximo,
    MIN(pago) as pago_minimo
  FROM postulantes 
  WHERE pago IS NOT NULL AND pago > 0
  GROUP BY escuela, modalidad
""")

// Vista temporal para análisis de habilidades
spark.sql("""
  CREATE OR REPLACE TEMPORARY VIEW vista_habilidades AS
  SELECT 
    idioma,
    lee,
    escribe,
    habla,
    COUNT(*) as total,
    AVG(edad) as edad_promedio
  FROM postulantes 
  GROUP BY idioma, lee, escribe, habla
""")

println("✅ Vistas temporales creadas para consultas SQL")

📋 PREPARACIÓN: Creando vistas temporales para SQL puro

✅ Vistas temporales creadas para consultas SQL
✅ Vistas temporales creadas para consultas SQL


[36mres221_1[39m: [32mDataFrame[39m = []
[36mres221_2[39m: [32mDataFrame[39m = []
[36mres221_3[39m: [32mDataFrame[39m = []

In [222]:
// SQL JOIN 1: Postulantes con contexto geográfico
println("\n📋 SQL JOIN 1: Postulantes con contexto geográfico regional")

val sqlJoin1 = spark.sql("""
  SELECT 
    p.uuid,
    p.escuela,
    p.departamento,
    p.edad,
    vg.total_postulantes as postulantes_region,
    vg.edad_promedio as edad_prom_region,
    ROUND((p.edad - vg.edad_promedio), 2) as diferencia_edad_region,
    vg.hombres,
    vg.mujeres,
    ROUND((vg.hombres * 100.0 / (vg.hombres + vg.mujeres)), 2) as pct_masculino_region
  FROM postulantes p
  INNER JOIN vista_geografica vg 
    ON p.departamento = vg.departamento 
    AND p.provincia = vg.provincia
    AND p.distrito_origen = vg.distrito_origen
  WHERE p.edad IS NOT NULL
  ORDER BY postulantes_region DESC, diferencia_edad_region DESC
  LIMIT 15
""")

sqlJoin1.show()
println(s"🌍 Registros con contexto geográfico: ${sqlJoin1.count()}")


📋 SQL JOIN 1: Postulantes con contexto geográfico regional

+--------------------+--------------------+------------+----+------------------+-----------------+----------------------+-------+-------+--------------------+
|                uuid|             escuela|departamento|edad|postulantes_region| edad_prom_region|diferencia_edad_region|hombres|mujeres|pct_masculino_region|
+--------------------+--------------------+------------+----+------------------+-----------------+----------------------+-------+-------+--------------------+
|8e5746586d5068b29...|INGENIERÍA DE ALI...|       CUSCO|  52|              1733|19.90998268897865|                 32.09|   1010|    723|               58.28|
|8515ed9e2a0423ddb...|    INGENIERÍA CIVIL|       CUSCO|  43|              1733|19.90998268897865|                 23.09|   1010|    723|               58.28|
|de7ececcce0f8c04b...|    INGENIERÍA CIVIL|       CUSCO|  42|              1733|19.90998268897865|                 22.09|   1010|    723|       

[36msqlJoin1[39m: [32mDataFrame[39m = [uuid: string, escuela: string ... 8 more fields]

In [223]:
// SQL JOIN 2: Análisis de carreras por modalidad con comparaciones
println("\n📋 SQL JOIN 2: Carreras y modalidades con métricas comparativas")

val sqlJoin2 = spark.sql("""
  SELECT 
    p.escuela,
    p.modalidad,
    COUNT(p.uuid) as postulantes_directos,
    va.cantidad_postulantes as total_modalidad_carrera,
    va.pago_promedio,
    va.pago_maximo,
    va.pago_minimo,
    ROUND((COUNT(p.uuid) * 100.0 / va.cantidad_postulantes), 2) as pct_participacion,
    CASE 
      WHEN va.pago_promedio > 200 THEN 'ALTO'
      WHEN va.pago_promedio > 100 THEN 'MEDIO'
      ELSE 'BAJO'
    END as nivel_pago
  FROM postulantes p
  LEFT JOIN vista_academica va 
    ON p.escuela = va.escuela AND p.modalidad = va.modalidad
  WHERE p.pago IS NOT NULL
  GROUP BY p.escuela, p.modalidad, va.cantidad_postulantes, 
           va.pago_promedio, va.pago_maximo, va.pago_minimo
  ORDER BY pago_promedio DESC, postulantes_directos DESC
""")

sqlJoin2.show(15, truncate = false)
println(s"🎓 Combinaciones carrera-modalidad analizadas: ${sqlJoin2.count()}")


📋 SQL JOIN 2: Carreras y modalidades con métricas comparativas

+------------------------------+-----------------------------------------------------------------+--------------------+-----------------------+------------------+-----------+-----------+-----------------+----------+
|escuela                       |modalidad                                                        |postulantes_directos|total_modalidad_carrera|pago_promedio     |pago_maximo|pago_minimo|pct_participacion|nivel_pago|
+------------------------------+-----------------------------------------------------------------+--------------------+-----------------------+------------------+-----------+-----------+-----------------+----------+
|INGENIERÍA CIVIL              |EXTRAORDINARIO-GRADUADOS Y TITULADOS                             |7                   |7                      |670.0             |670        |670        |100.00           |ALTO      |
|INGENIERÍA DE ALIMENTOS       |EXTRAORDINARIO-GRADUADOS Y TITULADOS   

[36msqlJoin2[39m: [32mDataFrame[39m = [escuela: string, modalidad: string ... 7 more fields]

In [224]:
// SQL JOIN 3: Análisis de competencias lingüísticas
println("\n📋 SQL JOIN 3: Competencias lingüísticas y perfiles académicos")

val sqlJoin3 = spark.sql("""
  SELECT 
    p.escuela,
    p.idioma,
    p.lee,
    p.escribe,
    p.habla,
    COUNT(p.uuid) as estudiantes_perfil,
    vh.total as total_perfil_linguistico,
    vh.edad_promedio as edad_prom_perfil,
    ROUND(AVG(p.edad), 2) as edad_prom_carrera_perfil,
    ROUND((COUNT(p.uuid) * 100.0 / vh.total), 2) as pct_carrera_en_perfil,
    CONCAT(p.lee, '-', p.escribe, '-', p.habla) as codigo_habilidades
  FROM postulantes p
  INNER JOIN vista_habilidades vh 
    ON p.idioma = vh.idioma 
    AND p.lee = vh.lee 
    AND p.escribe = vh.escribe 
    AND p.habla = vh.habla
  WHERE p.edad IS NOT NULL
  GROUP BY p.escuela, p.idioma, p.lee, p.escribe, p.habla, 
           vh.total, vh.edad_promedio
  HAVING COUNT(p.uuid) >= 5  -- Solo perfiles con al menos 5 estudiantes
  ORDER BY total_perfil_linguistico DESC, estudiantes_perfil DESC
""")

sqlJoin3.show(15, truncate = false)
println(s"🗣️ Perfiles lingüísticos-académicos analizados: ${sqlJoin3.count()}")


📋 SQL JOIN 3: Competencias lingüísticas y perfiles académicos

+------------------------------+-------+---+-------+-----+------------------+------------------------+------------------+------------------------+---------------------+------------------+
|escuela                       |idioma |lee|escribe|habla|estudiantes_perfil|total_perfil_linguistico|edad_prom_perfil  |edad_prom_carrera_perfil|pct_carrera_en_perfil|codigo_habilidades|
+------------------------------+-------+---+-------+-----+------------------+------------------------+------------------+------------------------+---------------------+------------------+
|INGENIERÍA CIVIL              |Español|SI |SI     |SI   |821               |2117                    |19.310817194142654|19.31                   |38.78                |SI-SI-SI          |
|INGENIERÍA AGRONÓMICA TROPICAL|Español|SI |SI     |SI   |565               |2117                    |19.310817194142654|19.18                   |26.69                |SI-SI-SI        

[36msqlJoin3[39m: [32mDataFrame[39m = [escuela: string, idioma: string ... 9 more fields]

In [225]:
// SQL JOIN 4: Join complejo con análisis multidimensional
println("\n📋 SQL JOIN 4: Análisis multidimensional con joins complejos")

val sqlJoin4 = spark.sql("""
  WITH estadisticas_dept AS (
    SELECT 
      departamento,
      COUNT(*) as total_dept,
      AVG(pago) as pago_prom_dept,
      AVG(edad) as edad_prom_dept
    FROM postulantes 
    WHERE pago IS NOT NULL AND edad IS NOT NULL
    GROUP BY departamento
  ),
  estadisticas_carrera AS (
    SELECT 
      escuela,
      COUNT(*) as total_carrera,
      AVG(pago) as pago_prom_carrera
    FROM postulantes 
    WHERE pago IS NOT NULL
    GROUP BY escuela
  )
  SELECT 
    p.uuid,
    p.departamento,
    p.escuela,
    p.pago,
    p.edad,
    ed.total_dept,
    ed.pago_prom_dept,
    ec.total_carrera,
    ec.pago_prom_carrera,
    ROUND((p.pago - ed.pago_prom_dept), 2) as diferencia_pago_dept,
    ROUND((p.pago - ec.pago_prom_carrera), 2) as diferencia_pago_carrera,
    CASE 
      WHEN p.pago > ed.pago_prom_dept AND p.pago > ec.pago_prom_carrera THEN 'SUPERIOR_AMBOS'
      WHEN p.pago > ed.pago_prom_dept THEN 'SUPERIOR_DEPT'
      WHEN p.pago > ec.pago_prom_carrera THEN 'SUPERIOR_CARRERA'
      ELSE 'INFERIOR_AMBOS'
    END as categoria_pago
  FROM postulantes p
  LEFT JOIN estadisticas_dept ed ON p.departamento = ed.departamento
  LEFT JOIN estadisticas_carrera ec ON p.escuela = ec.escuela
  WHERE p.pago IS NOT NULL AND p.edad IS NOT NULL
  ORDER BY diferencia_pago_dept DESC, diferencia_pago_carrera DESC
  LIMIT 20
""")

sqlJoin4.show(truncate = false)
println(s"📈 Análisis multidimensional completado")


📋 SQL JOIN 4: Análisis multidimensional con joins complejos

+----------------------------------------------------------------+------------+------------------------------+----+----+----------+------------------+-------------+------------------+--------------------+-----------------------+--------------+
|uuid                                                            |departamento|escuela                       |pago|edad|total_dept|pago_prom_dept    |total_carrera|pago_prom_carrera |diferencia_pago_dept|diferencia_pago_carrera|categoria_pago|
+----------------------------------------------------------------+------------+------------------------------+----+----+----------+------------------+-------------+------------------+--------------------+-----------------------+--------------+
|47a8d23129f81caf5e14b9f7bf7df5264d9e2f7ef4b589910963599245833672|AYACUCHO    |INGENIERÍA CIVIL              |670 |40  |152       |173.48684210526315|1813         |187.8599007170436 |496.51              |48

[36msqlJoin4[39m: [32mDataFrame[39m = [uuid: string, departamento: string ... 10 more fields]

In [226]:
// SQL JOIN 5: Join con funciones de ventana y ranking
println("\n📋 SQL JOIN 5: Rankings y ventanas con joins")

val sqlJoin5 = spark.sql("""
  WITH rankings AS (
    SELECT 
      uuid,
      departamento,
      escuela,
      pago,
      edad,
      ROW_NUMBER() OVER (PARTITION BY departamento ORDER BY pago DESC) as rank_pago_dept,
      ROW_NUMBER() OVER (PARTITION BY escuela ORDER BY pago DESC) as rank_pago_carrera,
      NTILE(4) OVER (ORDER BY pago) as cuartil_pago_general,
      PERCENT_RANK() OVER (ORDER BY edad) as percentil_edad
    FROM postulantes 
    WHERE pago IS NOT NULL AND edad IS NOT NULL
  ),
  promedios_dept AS (
    SELECT 
      departamento,
      AVG(pago) as pago_promedio,
      COUNT(*) as total_estudiantes
    FROM postulantes 
    WHERE pago IS NOT NULL
    GROUP BY departamento
  )
  SELECT 
    r.uuid,
    r.departamento,
    r.escuela,
    r.pago,
    r.edad,
    r.rank_pago_dept,
    r.rank_pago_carrera,
    r.cuartil_pago_general,
    ROUND(r.percentil_edad, 4) as percentil_edad,
    pd.pago_promedio as pago_prom_dept,
    pd.total_estudiantes,
    CASE 
      WHEN r.rank_pago_dept <= 3 THEN 'TOP_3_DEPARTAMENTO'
      WHEN r.rank_pago_carrera <= 3 THEN 'TOP_3_CARRERA'
      WHEN r.cuartil_pago_general = 4 THEN 'CUARTIL_SUPERIOR'
      ELSE 'REGULAR'
    END as categoria_ranking
  FROM rankings r
  INNER JOIN promedios_dept pd ON r.departamento = pd.departamento
  WHERE r.rank_pago_dept <= 5 OR r.rank_pago_carrera <= 5
  ORDER BY r.departamento, r.rank_pago_dept
""")

sqlJoin5.show(20, truncate = false)
println(s"🏆 Estudiantes destacados por ranking: ${sqlJoin5.count()}")


📋 SQL JOIN 5: Rankings y ventanas con joins

+----------------------------------------------------------------+------------+------------------------------+----+----+--------------+-----------------+--------------------+--------------+------------------+-----------------+------------------+
|uuid                                                            |departamento|escuela                       |pago|edad|rank_pago_dept|rank_pago_carrera|cuartil_pago_general|percentil_edad|pago_prom_dept    |total_estudiantes|categoria_ranking |
+----------------------------------------------------------------+------------+------------------------------+----+----+--------------+-----------------+--------------------+--------------+------------------+-----------------+------------------+
|ac6f6a84abb226401250a75edc6b90abe674380b59785fa5852e26a0df6c7e05|APURÍMAC    |INGENIERÍA CIVIL              |670 |40  |1             |1                |4                   |0.9939        |216.7741935483871 |31      

[36msqlJoin5[39m: [32mDataFrame[39m = [uuid: string, departamento: string ... 10 more fields]

### 📂 SUBSECCIÓN: CINCO CONSULTAS GROUPBY CON SQL

In [227]:
// SQL GROUPBY 1: Agrupación multidimensional por ubicación
println("📂 SQL GROUPBY 1: Estadísticas por departamento, provincia y tipo de comunidad")

val sqlGroupBy1 = spark.sql("""
  SELECT 
    departamento,
    provincia,
    tipo_comunidad,
    COUNT(*) as total_postulantes,
    COUNT(DISTINCT escuela) as carreras_disponibles,
    COUNT(DISTINCT modalidad) as modalidades_disponibles,
    SUM(CASE WHEN sexo = 'MASCULINO' THEN 1 ELSE 0 END) as hombres,
    SUM(CASE WHEN sexo = 'FEMENINO' THEN 1 ELSE 0 END) as mujeres,
    ROUND(AVG(edad), 2) as edad_promedio,
    ROUND(AVG(CASE WHEN pago > 0 THEN pago END), 2) as pago_promedio
  FROM postulantes 
  GROUP BY departamento, provincia, tipo_comunidad
  HAVING COUNT(*) >= 10  -- Solo grupos con al menos 10 postulantes
  ORDER BY total_postulantes DESC
""")

sqlGroupBy1.show(15)
println(s"🌍 Grupos de ubicación analizados: ${sqlGroupBy1.count()}")

📂 SQL GROUPBY 1: Estadísticas por departamento, provincia y tipo de comunidad

+-------------+-------------+--------------+-----------------+--------------------+-----------------------+-------+-------+-------------+-------------+
| departamento|    provincia|tipo_comunidad|total_postulantes|carreras_disponibles|modalidades_disponibles|hombres|mujeres|edad_promedio|pago_promedio|
+-------------+-------------+--------------+-----------------+--------------------+-----------------------+-------+-------+-------------+-------------+
|        CUSCO|LA CONVENCIÓN|       NINGUNA|             2794|                   6|                      8|   1629|   1165|        19.85|       189.19|
|        CUSCO|LA CONVENCIÓN|          NULL|              795|                   4|                      5|    496|    299|        20.62|       177.38|
|        CUSCO|        CUSCO|       NINGUNA|              245|                   6|                      8|    166|     79|        19.67|        215.2|
|        

[36msqlGroupBy1[39m: [32mDataFrame[39m = [departamento: string, provincia: string ... 8 more fields]

In [228]:
// SQL GROUPBY 2: Agrupación por perfil académico y demográfico
println("\n📂 SQL GROUPBY 2: Perfiles académicos y demográficos")

val sqlGroupBy2 = spark.sql("""
  SELECT 
    escuela,
    modalidad,
    sexo,
    CASE 
      WHEN edad < 18 THEN 'MENOR_18'
      WHEN edad BETWEEN 18 AND 22 THEN '18_22'
      WHEN edad BETWEEN 23 AND 27 THEN '23_27'
      ELSE 'MAYOR_27'
    END as rango_edad,
    COUNT(*) as cantidad,
    COUNT(CASE WHEN pago > 0 THEN 1 END) as con_pago,
    COUNT(CASE WHEN tiene_discapacidad = 'SI' THEN 1 END) as con_discapacidad,
    ROUND(AVG(CASE WHEN pago > 0 THEN pago END), 2) as pago_promedio,
    MIN(CASE WHEN pago > 0 THEN pago END) as pago_minimo,
    MAX(pago) as pago_maximo
  FROM postulantes 
  WHERE edad IS NOT NULL
  GROUP BY escuela, modalidad, sexo, 
           CASE 
             WHEN edad < 18 THEN 'MENOR_18'
             WHEN edad BETWEEN 18 AND 22 THEN '18_22'
             WHEN edad BETWEEN 23 AND 27 THEN '23_27'
             ELSE 'MAYOR_27'
           END
  HAVING COUNT(*) >= 5
  ORDER BY escuela, modalidad, cantidad DESC
""")

sqlGroupBy2.show(20, truncate = false)
println(s"🎓 Perfiles académico-demográficos: ${sqlGroupBy2.count()}")


📂 SQL GROUPBY 2: Perfiles académicos y demográficos

+------------+--------------------------------------------+---------+----------+--------+--------+----------------+-------------+-----------+-----------+
|escuela     |modalidad                                   |sexo     |rango_edad|cantidad|con_pago|con_discapacidad|pago_promedio|pago_minimo|pago_maximo|
+------------+--------------------------------------------+---------+----------+--------+--------+----------------+-------------+-----------+-----------+
|CONTABILIDAD|ORDINARIO                                   |MASCULINO|18_22     |28      |28      |0               |207.14       |200        |250        |
|CONTABILIDAD|ORDINARIO                                   |FEMENINO |MENOR_18  |27      |27      |0               |201.85       |200        |250        |
|CONTABILIDAD|ORDINARIO                                   |MASCULINO|MENOR_18  |26      |26      |0               |205.77       |200        |250        |
|CONTABILIDAD|ORDINARI

[36msqlGroupBy2[39m: [32mDataFrame[39m = [escuela: string, modalidad: string ... 8 more fields]

In [229]:
// SQL GROUPBY 3: Agrupación por competencias lingüísticas
println("\n📂 SQL GROUPBY 3: Competencias lingüísticas por región")

val sqlGroupBy3 = spark.sql("""
  SELECT 
    departamento,
    idioma,
    lee,
    escribe,
    habla,
    COUNT(*) as total_perfil,
    COUNT(DISTINCT escuela) as carreras_representadas,
    ROUND(AVG(edad), 2) as edad_promedio,
    COUNT(CASE WHEN sexo = 'MASCULINO' THEN 1 END) as hombres,
    COUNT(CASE WHEN sexo = 'FEMENINO' THEN 1 END) as mujeres,
    ROUND((COUNT(CASE WHEN sexo = 'FEMENINO' THEN 1 END) * 100.0 / COUNT(*)), 2) as pct_mujeres,
    CASE 
      WHEN lee = 'SI' AND escribe = 'SI' AND habla = 'SI' THEN 'COMPETENCIA_COMPLETA'
      WHEN lee = 'SI' OR escribe = 'SI' OR habla = 'SI' THEN 'COMPETENCIA_PARCIAL'
      ELSE 'SIN_COMPETENCIA'
    END as nivel_competencia
  FROM postulantes 
  GROUP BY departamento, idioma, lee, escribe, habla
  HAVING COUNT(*) >= 3
  ORDER BY departamento, total_perfil DESC, nivel_competencia
""")

sqlGroupBy3.show(15, truncate = false)
println(s"🗣️ Perfiles lingüísticos regionales: ${sqlGroupBy3.count()}")


📂 SQL GROUPBY 3: Competencias lingüísticas por región

+------------+-------+----+-------+-----+------------+----------------------+-------------+-------+-------+-----------+--------------------+
|departamento|idioma |lee |escribe|habla|total_perfil|carreras_representadas|edad_promedio|hombres|mujeres|pct_mujeres|nivel_competencia   |
+------------+-------+----+-------+-----+------------+----------------------+-------------+-------+-------+-----------+--------------------+
|APURÍMAC    |Español|SI  |SI     |SI   |16          |5                     |24.75        |10     |6      |37.50      |COMPETENCIA_COMPLETA|
|APURÍMAC    |NULL   |NULL|NULL   |NULL |5           |2                     |19.2         |4      |1      |20.00      |SIN_COMPETENCIA     |
|APURÍMAC    |Español|SI  |NO     |NO   |4           |3                     |19.75        |2      |2      |50.00      |COMPETENCIA_PARCIAL |
|AREQUIPA    |Español|SI  |SI     |SI   |21          |6                     |18.67        |10     

[36msqlGroupBy3[39m: [32mDataFrame[39m = [departamento: string, idioma: string ... 10 more fields]

In [230]:
// SQL GROUPBY 4: Agrupación por análisis temporal y pagos
println("\n📂 SQL GROUPBY 4: Análisis temporal de postulaciones y pagos")

val sqlGroupBy4 = spark.sql("""
  SELECT 
    annio_postulacion,
    CASE 
      WHEN pago = 0 OR pago IS NULL THEN 'SIN_PAGO'
      WHEN pago <= 50 THEN 'PAGO_BAJO'
      WHEN pago <= 150 THEN 'PAGO_MEDIO'
      WHEN pago <= 300 THEN 'PAGO_ALTO'
      ELSE 'PAGO_PREMIUM'
    END as categoria_pago,
    modalidad,
    COUNT(*) as total_postulantes,
    COUNT(DISTINCT departamento) as departamentos_origen,
    COUNT(DISTINCT escuela) as carreras_solicitadas,
    ROUND(AVG(edad), 2) as edad_promedio,
    MIN(edad) as edad_minima,
    MAX(edad) as edad_maxima,
    SUM(CASE WHEN pago > 0 THEN pago ELSE 0 END) as ingresos_totales,
    ROUND(AVG(CASE WHEN pago > 0 THEN pago END), 2) as pago_promedio_efectivo
  FROM postulantes 
  WHERE annio_postulacion IS NOT NULL
  GROUP BY annio_postulacion, 
           CASE 
             WHEN pago = 0 OR pago IS NULL THEN 'SIN_PAGO'
             WHEN pago <= 50 THEN 'PAGO_BAJO'
             WHEN pago <= 150 THEN 'PAGO_MEDIO'
             WHEN pago <= 300 THEN 'PAGO_ALTO'
             ELSE 'PAGO_PREMIUM'
           END,
           modalidad
  ORDER BY annio_postulacion DESC, total_postulantes DESC
""")

sqlGroupBy4.show(20, truncate = false)
println(s"📅 Grupos temporales-económicos analizados: ${sqlGroupBy4.count()}")


📂 SQL GROUPBY 4: Análisis temporal de postulaciones y pagos

+-----------------+--------------+-----------------------------------------------------------------+-----------------+--------------------+--------------------+-------------+-----------+-----------+----------------+----------------------+
|annio_postulacion|categoria_pago|modalidad                                                        |total_postulantes|departamentos_origen|carreras_solicitadas|edad_promedio|edad_minima|edad_maxima|ingresos_totales|pago_promedio_efectivo|
+-----------------+--------------+-----------------------------------------------------------------+-----------------+--------------------+--------------------+-------------+-----------+-----------+----------------+----------------------+
|2023             |PAGO_ALTO     |ORDINARIO                                                        |795              |13                  |6                   |19.66        |16         |53         |163400          |205.53

[36msqlGroupBy4[39m: [32mDataFrame[39m = [annio_postulacion: int, categoria_pago: string ... 9 more fields]

In [231]:
// SQL GROUPBY 5: Agrupación por análisis de inclusión y accesibilidad
println("\n📂 SQL GROUPBY 5: Análisis de inclusión y accesibilidad educativa")

val sqlGroupBy5 = spark.sql("""
  SELECT 
    tiene_discapacidad,
    tipo_comunidad,
    modalidad,
    sexo,
    COUNT(*) as total_grupo,
    COUNT(DISTINCT escuela) as carreras_accesibles,
    COUNT(DISTINCT departamento) as regiones_origen,
    ROUND(AVG(edad), 2) as edad_promedio,
    COUNT(CASE WHEN pago > 0 THEN 1 END) as realizaron_pago,
    ROUND((COUNT(CASE WHEN pago > 0 THEN 1 END) * 100.0 / COUNT(*)), 2) as pct_con_pago,
    ROUND(AVG(CASE WHEN pago > 0 THEN pago END), 2) as pago_promedio,
    -- Indicadores de competencia lingüística
    COUNT(CASE WHEN lee = 'SI' AND escribe = 'SI' AND habla = 'SI' THEN 1 END) as competencia_completa,
    COUNT(CASE WHEN lee = 'NO' AND escribe = 'NO' AND habla = 'NO' THEN 1 END) as sin_competencia,
    ROUND((COUNT(CASE WHEN lee = 'SI' AND escribe = 'SI' AND habla = 'SI' THEN 1 END) * 100.0 / COUNT(*)), 2) as pct_competencia_completa,
    -- Categorizar grupo
    CASE 
      WHEN tiene_discapacidad = 'SI' THEN 'GRUPO_PRIORITARIO'
      WHEN tipo_comunidad IN ('RURAL', 'COMUNIDAD NATIVA') THEN 'GRUPO_VULNERABLE'
      ELSE 'GRUPO_REGULAR'
    END as categoria_inclusion
  FROM postulantes 
  GROUP BY tiene_discapacidad, tipo_comunidad, modalidad, sexo
  HAVING COUNT(*) >= 2  -- Grupos con al menos 2 personas
  ORDER BY 
    CASE 
      WHEN tiene_discapacidad = 'SI' THEN 1
      WHEN tipo_comunidad IN ('RURAL', 'COMUNIDAD NATIVA') THEN 2
      ELSE 3
    END,
    total_grupo DESC
""")

sqlGroupBy5.show(15, truncate = false)
println(s"♿ Grupos de inclusión analizados: ${sqlGroupBy5.count()}")
println("🌐 Análisis enfocado en equidad e inclusión educativa")


📂 SQL GROUPBY 5: Análisis de inclusión y accesibilidad educativa

+------------------+--------------+-----------------------------------------------------------------+---------+-----------+-------------------+---------------+-------------+---------------+------------+-------------+--------------------+---------------+------------------------+-------------------+
|tiene_discapacidad|tipo_comunidad|modalidad                                                        |sexo     |total_grupo|carreras_accesibles|regiones_origen|edad_promedio|realizaron_pago|pct_con_pago|pago_promedio|competencia_completa|sin_competencia|pct_competencia_completa|categoria_inclusion|
+------------------+--------------+-----------------------------------------------------------------+---------+-----------+-------------------+---------------+-------------+---------------+------------+-------------+--------------------+---------------+------------------------+-------------------+
|SI                |NINGUNA       |O

[36msqlGroupBy5[39m: [32mDataFrame[39m = [tiene_discapacidad: string, tipo_comunidad: string ... 13 more fields]

### 🔄 SUBSECCIÓN: CINCO CONSULTAS ORDERBY CON OPERADORES

In [232]:
// SQL ORDERBY 1: Ordenamiento con filtros y cálculos complejos
println("🔄 SQL ORDERBY 1: Ranking de postulantes por métricas combinadas")

val sqlOrderBy1 = spark.sql("""
  SELECT 
    uuid,
    escuela,
    departamento,
    modalidad,
    edad,
    pago,
    -- Cálculos para ranking
    (pago * 0.4 + edad * 0.3 + 
     CASE WHEN sexo = 'FEMENINO' THEN 10 ELSE 0 END +
     CASE WHEN tiene_discapacidad = 'SI' THEN 15 ELSE 0 END) as puntaje_inclusion,
    -- Categorización
    CASE 
      WHEN pago > 200 AND edad < 25 THEN 'PREMIUM_JOVEN'
      WHEN pago > 150 THEN 'ALTO_VALOR'
      WHEN tiene_discapacidad = 'SI' OR tipo_comunidad = 'RURAL' THEN 'PRIORITARIO'
      ELSE 'REGULAR'
    END as categoria
  FROM postulantes 
  WHERE pago IS NOT NULL AND edad IS NOT NULL
    AND pago > 0
    AND departamento IN ('LIMA', 'AREQUIPA', 'CUSCO', 'PIURA')
  ORDER BY 
    categoria ASC,
    puntaje_inclusion DESC,
    pago DESC,
    edad ASC
  LIMIT 25
""")

sqlOrderBy1.show(truncate = false)
println(s"🏆 Top postulantes por criterios combinados mostrados")

🔄 SQL ORDERBY 1: Ranking de postulantes por métricas combinadas

+----------------------------------------------------------------+------------------------------+------------+------------------------------------+----+----+-----------------+----------+
|uuid                                                            |escuela                       |departamento|modalidad                           |edad|pago|puntaje_inclusion|categoria |
+----------------------------------------------------------------+------------------------------+------------+------------------------------------+----+----+-----------------+----------+
|4c7d00d8349d30f5dceb55fd315112773e0a77947846a685e270d1ca67258789|INGENIERÍA CIVIL              |CUSCO       |EXTRAORDINARIO-GRADUADOS Y TITULADOS|44  |670 |281.2            |ALTO_VALOR|
|b1ce2631353d05962206e90adcd555514342e0ba9ffa545c6474875efefe270c|INGENIERÍA CIVIL              |CUSCO       |EXTRAORDINARIO-GRADUADOS Y TITULADOS|36  |670 |278.8            |ALTO_VALOR|


[36msqlOrderBy1[39m: [32mDataFrame[39m = [uuid: string, escuela: string ... 6 more fields]

In [233]:
// SQL ORDERBY 2: Ordenamiento con funciones de ventana y percentiles
println("\n🔄 SQL ORDERBY 2: Análisis de percentiles y rankings por carrera")

val sqlOrderBy2 = spark.sql("""
  SELECT 
    uuid,
    escuela,
    pago,
    edad,
    -- Rankings por carrera
    ROW_NUMBER() OVER (PARTITION BY escuela ORDER BY pago DESC) as rank_pago_carrera,
    DENSE_RANK() OVER (PARTITION BY escuela ORDER BY edad ASC) as rank_edad_carrera,
    -- Percentiles
    PERCENT_RANK() OVER (ORDER BY pago) as percentil_pago_general,
    NTILE(10) OVER (ORDER BY pago) as decil_pago,
    -- Comparaciones con promedios
    AVG(pago) OVER (PARTITION BY escuela) as pago_prom_carrera,
    pago - AVG(pago) OVER (PARTITION BY escuela) as diferencia_promedio,
    -- Valores anterior y siguiente
    LAG(pago, 1) OVER (PARTITION BY escuela ORDER BY pago DESC) as pago_anterior,
    LEAD(pago, 1) OVER (PARTITION BY escuela ORDER BY pago DESC) as pago_siguiente
  FROM postulantes 
  WHERE pago IS NOT NULL AND pago > 0 AND edad IS NOT NULL
  ORDER BY 
    percentil_pago_general DESC,
    escuela ASC,
    rank_pago_carrera ASC
  LIMIT 30
""")

sqlOrderBy2.show(20, truncate = false)
println(s"📈 Análisis de percentiles y rankings completado")


🔄 SQL ORDERBY 2: Análisis de percentiles y rankings por carrera

+----------------------------------------------------------------+------------------------------+----+----+-----------------+-----------------+----------------------+----------+------------------+-------------------+-------------+--------------+
|uuid                                                            |escuela                       |pago|edad|rank_pago_carrera|rank_edad_carrera|percentil_pago_general|decil_pago|pago_prom_carrera |diferencia_promedio|pago_anterior|pago_siguiente|
+----------------------------------------------------------------+------------------------------+----+----+-----------------+-----------------+----------------------+----------+------------------+-------------------+-------------+--------------+
|10c78e221144ae1a416374cba76d72cda935f7c28dccf0103aa625743e273093|INGENIERÍA AGRONÓMICA TROPICAL|670 |23  |1                |9                |0.9983596473241747    |10        |174.1032428855063 |

[36msqlOrderBy2[39m: [32mDataFrame[39m = [uuid: string, escuela: string ... 10 more fields]

In [234]:
// SQL ORDERBY 3: Ordenamiento con agregaciones, filtros HAVING y subconsultas
println("\n🔄 SQL ORDERBY 3: Carreras ordenadas por competitividad y diversidad")

val sqlOrderBy3 = spark.sql("""
  SELECT 
    escuela,
    COUNT(*) as total_postulantes,
    COUNT(DISTINCT departamento) as diversidad_geografica,
    COUNT(DISTINCT modalidad) as modalidades_disponibles,
    ROUND(AVG(pago), 2) as pago_promedio,
    ROUND(STDDEV(pago), 2) as desviacion_pago,
    COUNT(CASE WHEN sexo = 'FEMENINO' THEN 1 END) as mujeres,
    COUNT(CASE WHEN tiene_discapacidad = 'SI' THEN 1 END) as con_discapacidad,
    ROUND((COUNT(CASE WHEN sexo = 'FEMENINO' THEN 1 END) * 100.0 / COUNT(*)), 2) as pct_mujeres,
    ROUND((COUNT(CASE WHEN tiene_discapacidad = 'SI' THEN 1 END) * 100.0 / COUNT(*)), 2) as pct_discapacidad,
    -- Índice de competitividad (más postulantes = más competitivo)
    ROUND((COUNT(*) * 0.4 + 
           COUNT(DISTINCT departamento) * 0.3 + 
           AVG(pago) * 0.003), 2) as indice_competitividad
  FROM postulantes 
  WHERE pago IS NOT NULL AND pago > 0
  GROUP BY escuela
  HAVING COUNT(*) >= 50  -- Solo carreras con mínimo 50 postulantes
    AND COUNT(DISTINCT departamento) >= 3  -- Al menos 3 departamentos
    AND AVG(pago) > (SELECT AVG(pago) * 0.8 FROM postulantes WHERE pago > 0)  -- Pago promedio decente
  ORDER BY 
    indice_competitividad DESC,
    diversidad_geografica DESC,
    pct_mujeres DESC,
    total_postulantes DESC
""")

sqlOrderBy3.show(truncate = false)
println(s"🎓 Carreras analizadas por competitividad y diversidad")


🔄 SQL ORDERBY 3: Carreras ordenadas por competitividad y diversidad

+------------------------------+-----------------+---------------------+-----------------------+-------------+---------------+-------+----------------+-----------+----------------+---------------------+
|escuela                       |total_postulantes|diversidad_geografica|modalidades_disponibles|pago_promedio|desviacion_pago|mujeres|con_discapacidad|pct_mujeres|pct_discapacidad|indice_competitividad|
+------------------------------+-----------------+---------------------+-----------------------+-------------+---------------+-------+----------------+-----------+----------------+---------------------+
|INGENIERÍA CIVIL              |1794             |18                   |8                      |189.85       |60.84          |469    |12              |26.14      |0.67            |723.57               |
|INGENIERÍA AGRONÓMICA TROPICAL|1511             |14                   |8                      |174.1        |61.75   

[36msqlOrderBy3[39m: [32mDataFrame[39m = [escuela: string, total_postulantes: bigint ... 9 more fields]

In [235]:
// SQL ORDERBY 4: Ordenamiento con CASE WHEN y operadores lógicos complejos
println("\n🔄 SQL ORDERBY 4: Clasificación de postulantes por perfil integral")

val sqlOrderBy4 = spark.sql("""
  SELECT 
    uuid,
    departamento,
    escuela,
    modalidad,
    sexo,
    edad,
    pago,
    tiene_discapacidad,
    tipo_comunidad,
    -- Puntaje de prioridad social
    CASE 
      WHEN tiene_discapacidad = 'SI' THEN 100
      WHEN tipo_comunidad IN ('RURAL', 'COMUNIDAD NATIVA') THEN 80
      WHEN sexo = 'FEMENINO' AND modalidad LIKE '%OPORTUNIDAD%' THEN 60
      WHEN departamento NOT IN ('LIMA', 'AREQUIPA', 'CUSCO') THEN 40
      ELSE 20
    END as puntaje_prioridad,
    -- Puntaje económico (inverso: menor pago = mayor puntaje)
    CASE 
      WHEN pago IS NULL OR pago = 0 THEN 100
      WHEN pago <= 50 THEN 80
      WHEN pago <= 100 THEN 60
      WHEN pago <= 200 THEN 40
      ELSE 20
    END as puntaje_economico,
    -- Puntaje académico
    CASE 
      WHEN modalidad = 'EXONERADO' THEN 100
      WHEN modalidad LIKE '%OPORTUNIDAD%' AND edad <= 20 THEN 80
      WHEN modalidad LIKE '%OPORTUNIDAD%' THEN 60
      WHEN modalidad = 'EXTRAORDINARIO' THEN 40
      ELSE 20
    END as puntaje_academico,
    -- Competencia lingüística
    CASE 
      WHEN lee = 'SI' AND escribe = 'SI' AND habla = 'SI' THEN 100
      WHEN (lee = 'SI' AND escribe = 'SI') OR (lee = 'SI' AND habla = 'SI') OR (escribe = 'SI' AND habla = 'SI') THEN 70
      WHEN lee = 'SI' OR escribe = 'SI' OR habla = 'SI' THEN 40
      ELSE 10
    END as puntaje_linguistico
  FROM postulantes 
  WHERE edad IS NOT NULL
  ORDER BY 
    -- Prioridad 1: Casos especiales de inclusión
    CASE 
      WHEN tiene_discapacidad = 'SI' AND tipo_comunidad = 'RURAL' THEN 1
      WHEN tiene_discapacidad = 'SI' THEN 2
      WHEN tipo_comunidad IN ('RURAL', 'COMUNIDAD NATIVA') THEN 3
      ELSE 4
    END ASC,
    -- Prioridad 2: Puntaje combinado (más alto = mejor)
    (puntaje_prioridad + puntaje_economico + puntaje_academico + puntaje_linguistico) DESC,
    -- Prioridad 3: Criterios de desempate
    pago ASC NULLS FIRST,
    edad ASC,
    departamento ASC
  LIMIT 30
""")

sqlOrderBy4.show(25, truncate = false)
println(s"⚖️ Postulantes clasificados por perfil integral de inclusión")


🔄 SQL ORDERBY 4: Clasificación de postulantes por perfil integral

+----------------------------------------------------------------+------------+------------------------------+-----------------------------------------------------------------+---------+----+----+------------------+--------------+-----------------+-----------------+-----------------+-------------------+
|uuid                                                            |departamento|escuela                       |modalidad                                                        |sexo     |edad|pago|tiene_discapacidad|tipo_comunidad|puntaje_prioridad|puntaje_economico|puntaje_academico|puntaje_linguistico|
+----------------------------------------------------------------+------------+------------------------------+-----------------------------------------------------------------+---------+----+----+------------------+--------------+-----------------+-----------------+-----------------+-------------------+
|a0cf09f29feeeb8e

[36msqlOrderBy4[39m: [32mDataFrame[39m = [uuid: string, departamento: string ... 11 more fields]

In [236]:
// SQL ORDERBY 5: Ordenamiento con UNION, intersecciones y operaciones de conjunto
println("\n🔄 SQL ORDERBY 5: Análisis comparativo con operaciones de conjunto")

val sqlOrderBy5 = spark.sql("""
  -- Unión de diferentes segmentos de postulantes con ordenamiento global
  (
    SELECT 
      'ALTO_RENDIMIENTO' as segmento,
      uuid,
      escuela,
      departamento,
      pago,
      edad,
      'Pago superior a 250 soles' as criterio,
      1 as prioridad_orden,
      pago as orden_valor
    FROM postulantes 
    WHERE pago > 250
  )
  UNION ALL
  (
    SELECT 
      'INCLUSION_SOCIAL' as segmento,
      uuid,
      escuela,
      departamento,
      pago,
      edad,
      'Discapacidad o comunidad rural' as criterio,
      2 as prioridad_orden,
      CASE WHEN tiene_discapacidad = 'SI' THEN 1 ELSE 2 END as orden_valor
    FROM postulantes 
    WHERE tiene_discapacidad = 'SI' OR tipo_comunidad = 'RURAL'
  )
  UNION ALL
  (
    SELECT 
      'JOVENES_TALENTO' as segmento,
      uuid,
      escuela,
      departamento,
      pago,
      edad,
      'Jóvenes de 16-19 años con modalidad oportunidad' as criterio,
      3 as prioridad_orden,
      edad as orden_valor
    FROM postulantes 
    WHERE edad BETWEEN 16 AND 19 
      AND modalidad LIKE '%OPORTUNIDAD%'
      AND pago > 100
  )
  UNION ALL
  (
    SELECT 
      'DIVERSIDAD_GEOGRAFICA' as segmento,
      uuid,
      escuela,
      departamento,
      pago,
      edad,
      'Departamentos con menor representación' as criterio,
      4 as prioridad_orden,
      pago as orden_valor
    FROM postulantes 
    WHERE departamento IN (
      SELECT departamento 
      FROM postulantes 
      GROUP BY departamento 
      HAVING COUNT(*) < 100
    )
    AND pago IS NOT NULL AND pago > 0
  )
  ORDER BY 
    prioridad_orden ASC,
    CASE 
      WHEN segmento = 'ALTO_RENDIMIENTO' THEN orden_valor 
      WHEN segmento = 'INCLUSION_SOCIAL' THEN orden_valor
      WHEN segmento = 'JOVENES_TALENTO' THEN orden_valor
      WHEN segmento = 'DIVERSIDAD_GEOGRAFICA' THEN orden_valor
    END ASC,
    uuid ASC
  LIMIT 40
""")

sqlOrderBy5.show(30, truncate = false)
println(s"🌍 Análisis multisegmento con operaciones de conjunto completado")


🔄 SQL ORDERBY 5: Análisis comparativo con operaciones de conjunto

+----------------+----------------------------------------------------------------+------------------------------+------------+----+----+------------------------------+---------------+-----------+
|segmento        |uuid                                                            |escuela                       |departamento|pago|edad|criterio                      |prioridad_orden|orden_valor|
+----------------+----------------------------------------------------------------+------------------------------+------------+----+----+------------------------------+---------------+-----------+
|ALTO_RENDIMIENTO|57bd78e7a875508362b4b169b2ad8d15e986168c3eaf10a4062654b8fbb9324a|INGENIERÍA AGRONÓMICA TROPICAL|CUSCO       |300 |27  |Pago superior a 250 soles     |1              |300        |
|ALTO_RENDIMIENTO|83cdd031386e5bac7c2270ee4054ca9ab7ed641deef07c5d636ca790287f10d1|INGENIERÍA AGRONÓMICA TROPICAL|CUSCO       |300 |21  |Pago su

[36msqlOrderBy5[39m: [32mDataFrame[39m = [segmento: string, uuid: string ... 7 more fields]