### Objetivo del laboratorio
Aplicar funciones de ventana para analizar rendimiento académico, ranking de estudiantes y evolución de notas en una universidad ficticia.

**Preparación**

- Cargar `estudiantes.csv, cursos.csv, profesores.csv, matriculas.csv.`

- Crear DataFrame `matriculas_enriched` con joins para enriquecer con nombre de estudiante, curso y facultad.

**Caso 1 – Ranking de estudiantes por carrera**

Identificar a los 3 mejores estudiantes en cada carrera por promedio de nota.

- `AVG(nota_final)` por estudiante y carrera.

- `RANK() OVER (PARTITION BY carrera ORDER BY AVG(nota_final) DESC).`

- Guardar tabla Delta `ranking_estudiantes_carrera.`

In [0]:
from pyspark.sql.functions import col, avg, round, rank, expr, lag, lit, when, sum
from pyspark.sql.window import Window

In [0]:
catalog_name = "dmc_06"
schema_name = "universidad"

spark.sql(
    f"CREATE CATALOG IF NOT EXISTS {catalog_name}"
)
spark.sql(
    f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_name}"
)

In [0]:
path_base="/Volumes/dmc_06/default/universidad/input/"

path_estudiantes=f"{path_base}estudiantes.csv"
path_cursos=f"{path_base}cursos.csv"
path_profesores=f"{path_base}profesores.csv"
path_matriculas=f"{path_base}matriculas.csv"


In [0]:
df_estudiantes= spark.read.option("header",True).option("inferSchema",True).csv(path_estudiantes)
df_cursos= spark.read.option("header",True).option("inferSchema",True).csv(path_cursos)
df_profesores= spark.read.option("header",True).option("inferSchema",True).csv(path_profesores)
df_matriculas= spark.read.option("header",True).option("inferSchema",True).csv(path_matriculas)

In [0]:
df_matriculas_clean=(
    df_matriculas
    .dropna(subset=["id_estudiante","id_curso","nota_final"])
    .filter(col("nota_final")>=0)
)

In [0]:
df_matriculas_enriched = (
    df_matriculas_clean.alias("mc")
    .join(
        df_estudiantes.alias("e"),
        col("mc.id_estudiante") == col("e.id_estudiante"),
        "left"
    )
    .join(
        df_cursos.alias("c"),
        col("mc.id_curso") == col("c.id_curso"),
        "left"
    )
    .drop(
        col("e.id_estudiante"),
        col("c.id_curso")
    )
)

In [0]:
display(df_matriculas_enriched)

In [0]:
%python
df_avg_estudiante_carrera = (
    df_matriculas_enriched
    .groupBy("id_estudiante", "carrera")
    .agg(round(avg(col("nota_final")), 2).alias("promedio"))
)

In [0]:
windows_rank = Window.partitionBy("carrera").orderBy(col("promedio").desc())

df_ranking_estudiantes_carrera = (
    df_avg_estudiante_carrera
    .withColumn("ranking", rank().over(windows_rank))
    .filter(col("ranking") <= 3)
)

In [0]:
display(df_ranking_estudiantes_carrera)

In [0]:
df_ranking_estudiantes_carrera.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_name}.ranking_estudiantes_carrera")

In [0]:
%sql
select * from dmc_06.universidad.ranking_estudiantes_carrera

**Caso 2 – Evolución de notas de cada estudiante**

Ver cómo varían las notas de cada alumno a lo largo de los semestres.

- Calcular promedio de notas por semestre.

- Usar `LAG()` para obtener nota promedio del semestre anterior.

- Columna `variacion = nota_promedio - nota_semestre_anterior.`

- Guardar tabla Delta `variacion_notas_estudiante.`

In [0]:
expr_ord = expr("int(split(semestre, '-')[0]) * 100 + int(split(semestre, '-')[1])")

df_avg_estudiante_semestre = (
    df_matriculas_enriched
    .groupBy("id_estudiante", "semestre")
    .agg(round(avg(col("nota_final")), 2).alias("promedio"))
    .withColumn(
        "semestre_orden",
        expr_ord
    )
)

In [0]:
display(df_avg_estudiante_semestre)

In [0]:
windows_lag = Window.partitionBy("id_estudiante").orderBy(col("semestre_orden"))

df_variacion_notas_estudiante = (
    df_avg_estudiante_semestre
    .withColumn("nota_anterior", lag("promedio").over(windows_lag))
    .withColumn("variacion", col("promedio") - col("nota_anterior"))
)

In [0]:
display(df_variacion_notas_estudiante)

In [0]:
df_variacion_notas_estudiante.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_name}.variacion_notas_estudiante")

In [0]:
%sql
select * from dmc_06.universidad.variacion_notas_estudiante

**Caso 3 – Acumulado de créditos aprobados**

Finanzas académicas quiere saber cuántos créditos ha acumulado cada estudiante.

- Considerar cursos con nota_final >= 11 como aprobados.

- Usar `SUM(creditos) OVER (PARTITION BY id_estudiante ORDER BY semestre).`

- Crear columna `estado`:

  - Menos de 30 créditos → “Inicial”

  - 30–90 créditos → “Intermedio”

  - Más de 90 créditos → “Avanzado”

- Guardar tabla Delta `creditos_acumulados.`

In [0]:
df_aprobados = (
    df_matriculas_enriched.alias("me")
    .join(
        df_cursos.alias("c"),
        col("me.id_curso") == col("c.id_curso"),
        "left"
    )
    .filter(col("me.nota_final") >= 11)
    .select(
        col("me.id_estudiante"),
        col("me.semestre"),
        col("c.creditos")
    )
    .withColumn(
        "semestre_orden",
        expr_ord
    ) 
)

In [0]:
display(df_aprobados)

In [0]:
windows_acum = Window.partitionBy("id_estudiante").orderBy(col("semestre_orden"))

df_creditos_acumulados = (
    df_aprobados
    .withColumn("creditos_acumulados", sum("creditos").over(windows_acum))
    .withColumn(
        "estado",
        when(col("creditos_acumulados") < 30, lit("Inicial"))
        .when(
            (col("creditos_acumulados") >= 30) &
            (col("creditos_acumulados") < 90), 
            lit("Intermedio")
        )
        .otherwise(lit("Avanzado"))
    )       
)

In [0]:
df_creditos_acumulados.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_name}.creditos_acumulados")

In [0]:
%sql

select * from dmc_06.universidad.creditos_acumulados

**Validaciones**

¿Quién es el mejor estudiante de Ingeniería?

¿Qué estudiante tuvo la mayor mejora de un semestre a otro?

¿Cuántos estudiantes ya están en estado “Avanzado”?