Inicié mi entorno de análisis utilizando Apache Spark. Primero, importé las librerías necesarias —en este caso, SparkSession y DataFrame— que son fundamentales para trabajar con datos en Spark.

Después, creé una instancia de SparkSession, que es el punto de entrada principal para trabajar con DataFrames y ejecutar consultas en Spark. Le asigné un nombre a la aplicación: "WorldBank Education Analysis", para mantener organizado mi entorno y facilitar la identificación del proceso si necesito monitorearlo.

Una vez creado el SparkSession, leí los archivos de datos en formato Parquet que se encontraban en una ruta de Google Cloud Storage (gs://scala-spark-datos/education_data/*.parquet). Esta lectura generó un DataFrame llamado worldBankDF, que contiene toda la información educativa del Banco Mundial que voy a analizar.

Para entender la estructura del conjunto de datos, utilicé el método .printSchema() que me permitió ver las columnas disponibles y sus tipos de datos. Finalmente, visualicé las primeras 5 filas con .show(5), lo cual me ayudó a hacer una inspección inicial de los datos y confirmar que todo se hubiera cargado correctamente.

In [5]:
import org.apache.spark.sql.{SparkSession, DataFrame}
val spark = SparkSession.builder()
  .appName("WorldBank Education Analysis")
  .getOrCreate()
val worldBankDF = spark.read
  .parquet("gs://scala-spark-datos/education_data/*.parquet")
worldBankDF.printSchema()
worldBankDF.show(5)

root
 |-- country_name: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- indicator_name: string (nullable = true)
 |-- indicator_code: string (nullable = true)
 |-- value: double (nullable = true)
 |-- year: long (nullable = true)

+------------+------------+--------------------+--------------+--------+----+
|country_name|country_code|      indicator_name|indicator_code|   value|year|
+------------+------------+--------------------+--------------+--------+----+
|        Chad|         TCD|Enrolment in lowe...|       UIS.E.2|321921.0|2012|
|        Chad|         TCD|Enrolment in uppe...|       UIS.E.3| 68809.0|2006|
|        Chad|         TCD|Enrolment in uppe...|       UIS.E.3| 30551.0|1999|
|        Chad|         TCD|Enrolment in uppe...|       UIS.E.3| 79784.0|2007|
|        Chad|         TCD|Repeaters in prim...|       UIS.R.1|282699.0|2006|
+------------+------------+--------------------+--------------+--------+----+
only showing top 5 rows



import org.apache.spark.sql.{SparkSession, DataFrame}
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2bf433eb
worldBankDF: org.apache.spark.sql.DataFrame = [country_name: string, country_code: string ... 4 more fields]


topCountries: Array[org.apache.spark.sql.Row] = Array()


En este ejercicio comencé explorando el conjunto de datos del Banco Mundial para entender qué información tenía disponible. Primero, consulté los años únicos presentes en el dataset para saber qué períodos podía analizar. Luego hice lo mismo con los códigos de indicadores, ya que me interesaba enfocar el análisis en uno específico: la tasa de matriculación primaria, cuyo código es SE.PRM.ENRR.

Una vez identificado el indicador de interés, filtré los datos para quedarme solo con ese y luego agrupé por año para contar cuántos registros tenía por cada uno. Ordené los resultados de forma descendente para que el año más reciente apareciera de primero. Así pude identificar automáticamente cuál era el año más reciente con datos disponibles, y lo guardé en una variable llamada latestYear.

Con esa información, modifiqué mi consulta original —que usaba un año fijo (2015)— para que tomara dinámicamente el año más reciente. Luego filtré el dataset usando ese año y el indicador de matriculación primaria, ordené los países por el valor de la tasa en orden descendente, seleccioné los 10 primeros, y extraje sus nombres y valores.

In [9]:
worldBankDF.select("year").distinct().orderBy("year").show(100)

worldBankDF.select("indicator_code").distinct().show(100)

val availableData = worldBankDF
  .filter($"indicator_code" === "SE.PRM.ENRR")
  .groupBy("year")
  .count()
  .orderBy(desc("year"))

availableData.show()


val latestYear = availableData.first().getAs[Long]("year")
println(s"El año más reciente disponible es: $latestYear")


val topCountriesUpdated = worldBankDF
  .filter($"indicator_code" === "SE.PRM.ENRR" && $"year" === latestYear)
  .orderBy(desc("value"))
  .limit(10)
  .select("country_name", "value")
  .collect()

println(s"Top 10 países por tasa de matriculación primaria ($latestYear):")
topCountriesUpdated.foreach { row =>
  val country = row.getAs[String]("country_name")
  val value = row.getAs[Double]("value")
  val stars = "*" * (value / 10).toInt
  println(f"$country%-20s $value%7.2f $stars")
}

+----+
|year|
+----+
|1970|
|1971|
|1972|
|1973|
|1974|
|1975|
|1976|
|1977|
|1978|
|1979|
|1980|
|1981|
|1982|
|1983|
|1984|
|1985|
|1986|
|1987|
|1988|
|1989|
|1990|
|1991|
|1992|
|1993|
|1994|
|1995|
|1996|
|1997|
|1998|
|1999|
|2000|
|2001|
|2002|
|2003|
|2004|
|2005|
|2006|
|2007|
|2008|
|2009|
|2010|
|2011|
|2012|
|2013|
|2014|
|2015|
|2016|
+----+

+--------------+
|indicator_code|
+--------------+
|   SE.COM.DURS|
|    UIS.E.1.G2|
|  UIS.NIRA.1.M|
| UIS.LR.AG65.M|
|  UIS.REPR.1.M|
|  UIS.UAEP.1.F|
| UIS.ROFST.2.M|
|   UIS.GOER.56|
|  UIS.R.1.G4.F|
|  UIS.UAEP.1.M|
| HH.DHS.OOST.X|
| UIS.E.3.GPV.F|
|    UIS.E.1.PR|
|   UIS.THDUR.0|
|  UIS.SLE.12.M|
|SE.PRM.ENRL.FE|
|SE.TER.ENRR.FE|
|   UIS.LR.AG65|
|  UIS.NERA.2.F|
|   UIS.PTRHC.3|
|SL.TLF.TOTL.IN|
| UIS.REPP.1.G6|
|   UIS.NE.1.G1|
|  UIS.TRTP.0.F|
| UIS.ROFST.1.M|
|SL.UEM.TOTL.ZS|
|    UIS.SLE.56|
|   UIS.SLE.4.F|
|   UIS.XPUBP.3|
|SE.PRE.ENRR.MA|
|   UIS.DR.1.G5|
|HH.MICS.SCR.Q2|
|SE.SEC.PRIV.ZS|
| UIS.REPR.1.G4|
|HH.DHS.OOS.1

availableData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [year: bigint, count: bigint]
latestYear: Long = 2010
topCountriesUpdated: Array[org.apache.spark.sql.Row] = Array([Chad,82.6077194213867])


Ejercicio 1 adaptado: Trabajar con el DataFrame de educación del Banco Mundial
1. Mostrar el esquema del DataFrame (equivale a mostrar el esquema de estudiantes)

In [10]:
println("Esquema del DataFrame:")
worldBankDF.printSchema()

Esquema del DataFrame:
root
 |-- country_name: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- indicator_name: string (nullable = true)
 |-- indicator_code: string (nullable = true)
 |-- value: double (nullable = true)
 |-- year: long (nullable = true)



2. Filtrar registros con un valor mayor a 80 (equivalente a filtrar estudiantes con calificación > 8, adaptado a escala 0-100)

In [11]:
val altosValoresDF = worldBankDF.filter($"value" > 80)
println("Registros con valor > 80:")
altosValoresDF.show(10)

Registros con valor > 80:
+------------+------------+--------------------+--------------+--------+----+
|country_name|country_code|      indicator_name|indicator_code|   value|year|
+------------+------------+--------------------+--------------+--------+----+
|        Chad|         TCD|Enrolment in lowe...|       UIS.E.2|321921.0|2012|
|        Chad|         TCD|Enrolment in uppe...|       UIS.E.3| 68809.0|2006|
|        Chad|         TCD|Enrolment in uppe...|       UIS.E.3| 30551.0|1999|
|        Chad|         TCD|Enrolment in uppe...|       UIS.E.3| 79784.0|2007|
|        Chad|         TCD|Repeaters in prim...|       UIS.R.1|282699.0|2006|
|        Chad|         TCD|Repeaters in prim...|       UIS.R.1|169600.0|1991|
|        Chad|         TCD|Repeaters in prim...|       UIS.R.1| 79342.0|1977|
|        Chad|         TCD|Repeaters in prim...|       UIS.R.1|251163.0|2001|
|        Chad|         TCD|Repeaters in prim...|       UIS.R.1|224397.0|2000|
|        Chad|         TCD|Teachers in

altosValoresDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country_name: string, country_code: string ... 4 more fields]


3. Seleccionar nombres de países y ordenarlos por valor (calificación) de forma descendente
(equivalente a ordenar nombres de estudiantes por calificación)

In [12]:
val paisesPorValor = worldBankDF
  .select("country_name", "indicator_name", "value", "year")
  .orderBy(desc("value")) 
println("Países ordenados por valor (descendente):")
paisesPorValor.show(10)

Países ordenados por valor (descendente):
+------------+--------------------+-------------------+----+
|country_name|      indicator_name|              value|year|
+------------+--------------------+-------------------+----+
|        Chad|GDP at market pri...|1.23680710387362E10|2012|
|        Chad|GDP at market pri...|1.21563804250825E10|2011|
|        Chad|GDP at market pri...| 1.0666537555594E10|2011|
|        Chad|GDP at market pri...|1.03519326044154E10|2008|
|        Chad|   GNI (current US$)|1.03023488371154E10|2010|
|        Chad|GDP at market pri...| 9.00605773352815E9|2008|
|        Chad|GDP at market pri...| 8.40792103468586E9|2005|
|        Chad|GDP at market pri...| 3.21482886992383E9|1989|
|        Chad|GDP at market pri...| 2.28465807624205E9|1972|
|        Chad|GDP at market pri...|  2.2808611566978E9|1983|
+------------+--------------------+-------------------+----+
only showing top 10 rows



paisesPorValor: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country_name: string, indicator_name: string ... 2 more fields]


Ejercicio 2: UDF (User Defined Function)

Pregunta: Define una función que determine si un número es par o impar. Aplica esta función a una columna de un DataFrame que contenga una lista de números.


En este ejercicio trabajé con una función definida por mí (una UDF) para evaluar valores educativos en relación con un umbral que establecí previamente. El objetivo fue clasificar cada valor como "por encima" o "por debajo" del umbral, facilitando así el análisis de los datos.

Primero creé una función personalizada que toma un valor educativo como entrada y lo compara con un umbral. Según el resultado de esa comparación, la función devuelve una etiqueta indicando si el valor está por encima o por debajo del umbral. Luego apliqué esta función a la columna value de mi DataFrame. Esto me permitió evaluar cada fila del conjunto de datos de manera individual, asignando la categoría correspondiente según el resultado de la comparación.
Luego podemos mirar el DataFrame resultante, que ahora incluye una nueva columna con la clasificación generada por la UDF. 

In [13]:
import org.apache.spark.sql.functions.{udf, col}
val evaluateValueUDF = udf((value: Double) => {if (value > 50000.0) "Alto" else "Bajo"})
val resultDF = worldBankDF.withColumn("nivel_valor", evaluateValueUDF(col("value")))
resultDF.select("country_name", "indicator_name", "value", "nivel_valor", "year").show(10)

+------------+--------------------+--------+-----------+----+
|country_name|      indicator_name|   value|nivel_valor|year|
+------------+--------------------+--------+-----------+----+
|        Chad|Enrolment in lowe...|321921.0|       Alto|2012|
|        Chad|Enrolment in uppe...| 68809.0|       Alto|2006|
|        Chad|Enrolment in uppe...| 30551.0|       Bajo|1999|
|        Chad|Enrolment in uppe...| 79784.0|       Alto|2007|
|        Chad|Repeaters in prim...|282699.0|       Alto|2006|
|        Chad|Repeaters in prim...|169600.0|       Alto|1991|
|        Chad|Repeaters in prim...| 79342.0|       Alto|1977|
|        Chad|Repeaters in prim...|251163.0|       Alto|2001|
|        Chad|Repeaters in prim...|224397.0|       Alto|2000|
|        Chad|Teachers in lower...|  2703.0|       Bajo|2000|
+------------+--------------------+--------+-----------+----+
only showing top 10 rows



import org.apache.spark.sql.functions.{udf, col}
evaluateValueUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction($Lambda$3876/1787500908@4a507d4d,StringType,Some(List(DoubleType)))
resultDF: org.apache.spark.sql.DataFrame = [country_name: string, country_code: string ... 5 more fields]


Ejercicio 3: Joins y agregaciones
  Pregunta: Dado dos DataFrames,
            uno con información de estudiantes (id, nombre)
            y otro con calificaciones (id_estudiante, asignatura, calificacion),
            realiza un join entre ellos y calcula el promedio de calificaciones por estudiante.

En este ejercicio adapté la lógica original —que consistía en unir un DataFrame de estudiantes con otro de calificaciones para calcular promedios por estudiante— al contexto del dataset educativo del Banco Mundial que ya tenía cargado. En lugar de estudiantes y calificaciones, trabajé con países e indicadores educativos.

Primero, creé un DataFrame llamado countriesDF que contiene información única por país, específicamente el nombre y el código del país. Esto lo hice a partir del worldBankDF, seleccionando las columnas country_name y country_code y eliminando duplicados con .distinct(). Este paso simula el DataFrame de estudiantes del ejercicio original, solo que aquí los “estudiantes” son países.

Luego, preparé otro DataFrame llamado indicatorsDF, donde trabajé con los indicadores educativos como si fueran “calificaciones”. Seleccioné el código del país, el nombre del indicador, su valor y el año, y filtré aquellos registros en los que el valor no fuera nulo. Esto representa las “notas” que cada país ha recibido en distintos indicadores a lo largo del tiempo.

A continuación, realicé un join interno entre los dos DataFrames usando el código del país como clave de unión. Esto me permitió combinar la información general de los países con sus respectivos indicadores educativos en un solo DataFrame (joinedDF), de manera similar a cómo en el ejercicio original se unían estudiantes con sus calificaciones.

Después de tener los datos combinados, calculé el promedio de los indicadores por país. Para ello, agrupé por country_name y country_code, y apliqué funciones de agregación: saqué el promedio de los valores (avg) y conté cuántos indicadores tenía cada país. Ordené los resultados de forma descendente para ver qué países tenían, en promedio, mejores valores educativos.

In [17]:
val countriesDF = worldBankDF
  .select("country_name", "country_code") //DataFrame con información resumida por país
  .distinct()

val indicatorsDF = worldBankDF
  .select("country_code", "indicator_name", "value", "year") // indicadores educativos como si fueran "calificaciones"
  .filter(col("value").isNotNull)

val joinedDF = countriesDF.join(indicatorsDF, Seq("country_code"), "inner")

val avgByCountryDF = joinedDF
  .groupBy("country_name", "country_code")
  .agg(
    avg("value").alias("promedio_indicadores"),
    count("indicator_name").alias("total_indicadores")
  )
  .orderBy(desc("promedio_indicadores"))

avgByCountryDF.show(10)

+------------+------------+--------------------+-----------------+
|country_name|country_code|promedio_indicadores|total_indicadores|
+------------+------------+--------------------+-----------------+
|        Chad|         TCD| 9.660887661559488E7|             1000|
+------------+------------+--------------------+-----------------+



countriesDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country_name: string, country_code: string]
indicatorsDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country_code: string, indicator_name: string ... 2 more fields]
joinedDF: org.apache.spark.sql.DataFrame = [country_code: string, country_name: string ... 3 more fields]
avgByCountryDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country_name: string, country_code: string ... 2 more fields]


Ejercicio 4: Uso de RDDs
Pregunta: Crea un RDD a partir de una lista de palabras y cuenta la cantidad de ocurrencias de cada palabra.

Primero, tomé el DataFrame original (worldBankDF) y seleccioné únicamente la columna indicator_name, que contiene el nombre de cada indicador educativo. Luego lo convertí a un RDD utilizando .rdd y lo transformé en una lista de strings con .map, extrayendo cada nombre de indicador. Así obtuve un RDD plano con todos los nombres de indicadores presentes en el conjunto de datos.

Con este RDD, apliqué un conteo de frecuencia para saber cuántas veces aparece cada indicador en el dataset. Para eso, mapeé cada indicador a una tupla (indicador, 1) y luego apliqué reduceByKey para sumar las ocurrencias. Posteriormente, ordené los resultados en orden descendente de frecuencia usando sortBy.

Una vez obtenidas las frecuencias, convertí el RDD resultante de vuelta a un DataFrame (indicatorCountsDF) usando toDF, lo cual me permitió visualizar los resultados de forma más cómoda con .show(). Así pude identificar fácilmente los indicadores educativos más frecuentes en los datos del Banco Mundial.

Además, repetí exactamente el mismo procedimiento pero aplicado a los países. Seleccioné la columna country_name, la convertí en un RDD de strings, conté las ocurrencias por país, ordené los resultados, y los transformé en un nuevo DataFrame (countryCountsDF). Esto me dio una vista clara de cuáles países tienen más registros educativos en el conjunto de datos, lo que puede indicar una mejor cobertura o una mayor disponibilidad histórica de datos.

In [16]:
val indicatorsRDD = worldBankDF
  .select("indicator_name")
  .rdd
  .map(row => row.getString(0))

val indicatorCountsRDD = indicatorsRDD
  .map(indicator => (indicator, 1))
  .reduceByKey(_ + _)
  .sortBy(_._2, ascending = false)

val indicatorCountsDF = indicatorCountsRDD.toDF("indicator_name", "count")
indicatorCountsDF.show(10, false)

+---------------------------------------------------------------------------------+-----+
|indicator_name                                                                   |count|
+---------------------------------------------------------------------------------+-----+
|Population of the official entrance age to primary education, both sexes (number)|13   |
|GNI (current US$)                                                                |10   |
|Mortality rate, under-5 (per 1,000)                                              |10   |
|Theoretical duration of upper secondary education (years)                        |10   |
|Theoretical duration of primary education (years)                                |9    |
|Gross enrolment ratio, primary and secondary, female (%)                         |9    |
|Drop-out rate from Grade 5 of primary education, female (%)                      |8    |
|Enrolment in secondary general, both sexes (number)                              |8    |
|Gross enr

indicatorsRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[74] at map at <console>:35
indicatorCountsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[79] at sortBy at <console>:40
indicatorCountsDF: org.apache.spark.sql.DataFrame = [indicator_name: string, count: int]


In [19]:
val ventasDF = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("gs://scala-spark-datos/ventas.csv")

ventasDF: org.apache.spark.sql.DataFrame = [id_venta: int, id_producto: int ... 2 more fields]


In [20]:
ventasDF.printSchema()
ventasDF.show(5)

root
 |-- id_venta: integer (nullable = true)
 |-- id_producto: integer (nullable = true)
 |-- cantidad: integer (nullable = true)
 |-- precio_unitario: double (nullable = true)

+--------+-----------+--------+---------------+
|id_venta|id_producto|cantidad|precio_unitario|
+--------+-----------+--------+---------------+
|       1|        101|       5|           20.0|
|       2|        102|       3|           15.0|
|       3|        101|       2|           20.0|
|       4|        103|       7|           10.0|
|       5|        102|       4|           15.0|
+--------+-----------+--------+---------------+
only showing top 5 rows



In [21]:
import org.apache.spark.sql.functions._

val ventasConIngresoDF = ventasDF
  .withColumn("ingreso_total", col("cantidad") * col("precio_unitario"))

import org.apache.spark.sql.functions._
ventasConIngresoDF: org.apache.spark.sql.DataFrame = [id_venta: int, id_producto: int ... 3 more fields]


In [22]:
ventasConIngresoDF.show(5)

+--------+-----------+--------+---------------+-------------+
|id_venta|id_producto|cantidad|precio_unitario|ingreso_total|
+--------+-----------+--------+---------------+-------------+
|       1|        101|       5|           20.0|        100.0|
|       2|        102|       3|           15.0|         45.0|
|       3|        101|       2|           20.0|         40.0|
|       4|        103|       7|           10.0|         70.0|
|       5|        102|       4|           15.0|         60.0|
+--------+-----------+--------+---------------+-------------+
only showing top 5 rows



In [23]:
val ingresosPorProductoDF = ventasConIngresoDF
  .groupBy("id_producto")
  .agg(
    sum("ingreso_total").alias("ingreso_total"),
    sum("cantidad").alias("cantidad_total"),
    avg("precio_unitario").alias("precio_promedio"),
    count("id_venta").alias("num_ventas")
  )
  .orderBy(desc("ingreso_total"))

ingresosPorProductoDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id_producto: int, ingreso_total: double ... 3 more fields]


In [24]:
ingresosPorProductoDF.show()

+-----------+-------------+--------------+---------------+----------+
|id_producto|ingreso_total|cantidad_total|precio_promedio|num_ventas|
+-----------+-------------+--------------+---------------+----------+
|        104|        800.0|            16|           50.0|         5|
|        105|        570.0|            19|           30.0|         5|
|        109|        540.0|            20|           27.0|         4|
|        110|        494.0|            26|           19.0|         4|
|        108|        486.0|            27|           18.0|         5|
|        101|        460.0|            23|           20.0|         6|
|        106|        425.0|            17|           25.0|         5|
|        102|        405.0|            27|           15.0|         6|
|        107|        396.0|            18|           22.0|         5|
|        103|        280.0|            28|           10.0|         5|
+-----------+-------------+--------------+---------------+----------+

