In [150]:
#MÉTODO 1 (perspectiva alumno)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, date_format, avg
from statsmodels.tsa.arima.model import ARIMA
import pandas as pd
from datetime import timedelta
import pytest

In [151]:
# Crear la sesión Spark
spark = SparkSession.builder.appName("PrediccionAsistencia").getOrCreate()

25/01/10 19:25:13 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [152]:
# Leer el archivo CSV en PySpark
file_path = "asistencia_test.csv"
data = spark.read.csv(file_path, header=True, inferSchema=True)

In [164]:
# Pasar estado a valor numérico
data = data.withColumn(
    "estado_codificado",
    when(col("estado_asistencia") == "presente", 1)
    .when(col("estado_asistencia") == "tarde", 0.5)
    .otherwise(0)
)

# Agrupamos por alumno, asignatura y día y calcular estado promedio
series_temporales = data.groupBy(
    "alumno",
    "asignatura",
    date_format("timestamp", "yyyy-MM-dd").alias("fecha") # Pasar timestamp a fecha
).agg(
    avg("estado_codificado").alias("estado_promedio")  # Calcula media del estado
)

# Convertir el dataframe de PySpark a Pandas
series_pandas = series_temporales.toPandas()

# Ordenar los datos
series_pandas['fecha'] = pd.to_datetime(series_pandas['fecha'])
series_pandas = series_pandas.sort_values(by=['alumno', 'asignatura', 'fecha'])

# Almacenar resultados de predicciones
resultados_predicciones = []

# ARIMA para cada alumno y asignatura
for (alumno, asignatura), grupo in series_pandas.groupby(["alumno", "asignatura"]):
    grupo = grupo.sort_values("fecha")

    # Ordenar la lista por fecha
    grupo = grupo.set_index("fecha")
    grupo.index.freq = 'D'  # Frecuencia diaria

    serie = grupo["estado_promedio"]

    # Solo entrenamos si hay suficientes datos
    if len(serie) > 10:
        try:
            # Entrenamiento del modelo
            modelo = ARIMA(serie, order=(1, 1, 0)) # p, d, q
            ajuste = modelo.fit()

            # Se predice el día siguiente
            predicciones = ajuste.forecast(steps=1)
            ultima_fecha = grupo.index[-1]

            for i, pred in enumerate(predicciones):
                 dia_predicho = ultima_fecha + timedelta(days=i+1)
                 estado_predicho = pred
                 estado_categoria = (
                     "presente" if estado_predicho >= 0.75 else
                     "tarde" if estado_predicho >= 0.25 else
                     "ausente"
                 )

            resultados_predicciones.append({
                    "alumno": alumno,
                    "asignatura": asignatura,
                    "dia_predicho": dia_predicho.strftime("%Y-%m-%d"),
                    "estado_predicho_valor": estado_predicho,
                    "estado_predicho_categoria": estado_categoria
                })
        except Exception as e:
            print(f"Error al ajustar ARIMA para {alumno}, {asignatura}: {e}")

# Convertimos los resultados a un df de Pandas
df_resultados = pd.DataFrame(resultados_predicciones)

# Exportación de los resultados a un CSV
output_path = "forecast_por_alumno.csv"
df_resultados.to_csv(output_path, index=False)

print(f"Predicciones guardadas en: {output_path}")

Predicciones guardadas en: forecast_por_alumno.csv




In [165]:
# MÉTODO 2 (por asignatura)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, date_format, avg, count
from statsmodels.tsa.arima.model import ARIMA
import pandas as pd
from datetime import timedelta

# Crear la sesión Spark
spark = SparkSession.builder.appName("PrediccionAsistenciaGlobal").getOrCreate()

# Leer el archivo CSV en PySpark
file_path = "asistencia_test.csv"
data = spark.read.csv(file_path, header=True, inferSchema=True)

# Formatear las fechas
data = data.withColumn("fecha", date_format(col("timestamp"), "yyyy-MM-dd"))

# Calcular totales diarios por asignatura y estado de asistencia
totales_diarios = data.groupBy(
    "asignatura", "estado_asistencia", "fecha"
).agg(count("alumno").alias("total_diario"))

# Convertir el DataFrame de PySpark a Pandas
totales_pandas = totales_diarios.toPandas()

# Ordenar y preparar las series temporales
totales_pandas["fecha"] = pd.to_datetime(totales_pandas["fecha"])
totales_pandas = totales_pandas.sort_values(by=["asignatura", "estado_asistencia", "fecha"])

# Almacenar resultados de predicciones
resultados_forecast = []

# Agrupar series temporales por asignatura y estado de asistencia
series_agrupadas = totales_pandas.groupby(["asignatura", "estado_asistencia"])

# Iterar sobre cada grupo (asignatura y estado de asistencia)
for (asignatura, estado), grupo in series_agrupadas:
    grupo = grupo.sort_values("fecha")
    grupo = grupo.set_index("fecha")
    grupo.index.freq = 'D'

    # Asegurarse de que hay suficientes datos
    if len(grupo) > 10:
        serie = grupo["total_diario"]
        ultimo_dia = grupo.index[-1]

        try:
            # Entrenamiento del modelo
            modelo = ARIMA(serie, order=(5, 1, 0)) # p, d, q
            ajuste = modelo.fit()

            # Se predice el día siguiente
            prediccion = ajuste.forecast(steps=1)

            # Calcular el día predicho
            dia_predicho = ultimo_dia + timedelta(days=1)

            # Almacenar el resultado
            resultados_forecast.append({
                "asignatura": asignatura,
                "dia": dia_predicho.strftime("%Y-%m-%d"),
                "estado_asistencia": estado,
                "numero_alumnos": round(prediccion.iloc[0])  # Redondear el valor
            })
        except Exception as e:
            print(f"Error al ajustar ARIMA para {asignatura} - {estado}: {e}")

# Convertimos los resultados a un DataFrame de Pandas
df_forecast = pd.DataFrame(resultados_forecast)

# Exportación de los resultados a un CSV
output_path = "forecast_por_asignatura1.csv"
df_forecast.to_csv(output_path, index=False)



print(f"Predicciones globales guardadas en: {output_path}")

Predicciones globales guardadas en: forecast_por_asignatura1.csv


## Funciones para probar el código

In [155]:
def comprobar_num_columnas_spark(df):
    resultado_esperado = 5
    n_columnas = len(df.columns)
    if n_columnas == resultado_esperado:
        print(f"OK -> El DataFrame de Spark tiene {n_columnas} columnas (lo esperado).")
    else:
        print(f"FAIL -> Se esperaban {resultado_esperado} columnas, pero tiene {n_columnas}.")

In [156]:
def comprobar_num_filas_spark(df):
    min_rows = 2710
    n_filas = df.count()
    if n_filas >= min_rows:
        print(f"OK -> El DataFrame de Spark tiene {n_filas} filas, mínimo esperado era {min_rows}.")
    else:
        print(f"FAIL -> El DataFrame de Spark tiene {n_filas} filas, es menos de {min_rows}.")

In [157]:
# Comprobamos que se haga bien el cambio a datatime. Ya qué spark trabaja con Timestamp pero pandas trabaja con datetime para facilitar que se haga el trabajo de
# ordenacion y la medición de fecuencia diaria línea:     grupo.index.freq = 'D'  # Frecuencia diaria
def comprobar_columna_fecha_pandas(df):
    columna_fecha = "fecha"
    if pd.api.types.is_datetime64_any_dtype(df[columna_fecha]):
        print("OK -> La columna 'fecha' en el DataFrame de Pandas es de tipo datetime.")
    else:
        print("FAIL -> La columna 'fecha' NO es de tipo datetime.")

In [158]:
# Comprobamos que el df se rellena bien
def comprobar_resultados_no_vacios(df):
    n_nulls = df.isnull().sum()
    hay_nulls = False
    
    for col, num_nulls in n_nulls.items():
        if num_nulls > 0:
            print(f"FAIL -> La columna '{col}' tiene {num_nulls} valores nulos.")
            hay_nulls = True
        else:
            print(f"OK -> La columna '{col}' no tiene valores nulos.")
    
    if not hay_nulls:
        print("OK -> No se han encontrado nulos en ninguna columna.")

In [159]:
def comprobar_arima_alumnos(df_pandas):
    columnas_esperadas = [
        "alumno",
        "asignatura",
        "dia_predicho",
        "estado_predicho_valor",
        "estado_predicho_categoria"
    ]
        
    for col in columnas_esperadas:
        if col not in df_pandas.columns:
            print(f"FAIL -> Falta la columna '{col}' en los resultados de predicción.")
            return
        else:
            print(f"OK -> La columna '{col}' se encuentra en dentro de columnas_esperadas.")

In [160]:
def comprobar_arima_asignaturas(df_pandas):
    columnas_esperadas = [
        "asignatura",
        "dia",
        "estado_asistencia",
        "numero_alumnos"
    ]
        
    for col in columnas_esperadas:
        if col not in df_pandas.columns:
            print(f"FAIL -> Falta la columna '{col}' en los resultados de predicción.")
            return
        else:
            print(f"OK -> La columna '{col}' se encuentra en dentro de columnas_esperadas.")


In [161]:
def comprobar_duplicados(df_pandas):

    dfprueba = df_pandas[["alumno", "asignatura", "dia_predicho", "estado_predicho_valor", "estado_predicho_categoria"]]
    
    num_filas = len(dfprueba) # numero de filas del df que acabamos de crear
    num_filas_unicas = len(dfprueba.drop_duplicates())
    
    # Si num_filas_unicas == num_filas, significa que no se han quitado filas, por lo tanto no había duplicados.
    if num_filas_unicas < num_filas:
        print("FAIL -> Se encontraron filas duplicadas en el df.")
    else:
        print("OK -> No hay duplicados.")

In [162]:
def comprobar_duplicados_asignaturas(df_pandas):

    dfprueba = df_pandas[["asignatura", "dia", "estado_asistencia", "numero_alumnos"]]
    
    num_filas = len(dfprueba) # numero de filas del df que acabamos de crear
    num_filas_unicas = len(dfprueba.drop_duplicates())
    
    # Si num_filas_unicas == num_filas, significa que no se han quitado filas, por lo tanto no había duplicados.
    if num_filas_unicas < num_filas:
        print("FAIL -> Se encontraron filas duplicadas en el df.")
    else:
        print("OK -> No hay duplicados.")

## EJECUCIÓN DE PRUEBAS POR ALUMNO

In [166]:
comprobar_num_columnas_spark(data)
comprobar_num_filas_spark(data)
comprobar_columna_fecha_pandas(series_pandas)
comprobar_resultados_no_vacios(df_resultados)
comprobar_arima_alumnos(df_resultados)
comprobar_duplicados(df_resultados)

OK -> El DataFrame de Spark tiene 5 columnas (lo esperado).
OK -> El DataFrame de Spark tiene 2710 filas, mínimo esperado era 2710.
OK -> La columna 'fecha' en el DataFrame de Pandas es de tipo datetime.
OK -> La columna 'alumno' no tiene valores nulos.
OK -> La columna 'asignatura' no tiene valores nulos.
OK -> La columna 'dia_predicho' no tiene valores nulos.
OK -> La columna 'estado_predicho_valor' no tiene valores nulos.
OK -> La columna 'estado_predicho_categoria' no tiene valores nulos.
OK -> No se han encontrado nulos en ninguna columna.
OK -> La columna 'alumno' se encuentra en dentro de columnas_esperadas.
OK -> La columna 'asignatura' se encuentra en dentro de columnas_esperadas.
OK -> La columna 'dia_predicho' se encuentra en dentro de columnas_esperadas.
OK -> La columna 'estado_predicho_valor' se encuentra en dentro de columnas_esperadas.
OK -> La columna 'estado_predicho_categoria' se encuentra en dentro de columnas_esperadas.
OK -> No hay duplicados.


## EJECUCIÓN DE PRUEBAS POR ASIGNATURA

In [167]:
comprobar_resultados_no_vacios(df_forecast)
comprobar_arima_asignaturas(df_forecast)
comprobar_duplicados_asignaturas(df_forecast)

OK -> La columna 'asignatura' no tiene valores nulos.
OK -> La columna 'dia' no tiene valores nulos.
OK -> La columna 'estado_asistencia' no tiene valores nulos.
OK -> La columna 'numero_alumnos' no tiene valores nulos.
OK -> No se han encontrado nulos en ninguna columna.
OK -> La columna 'asignatura' se encuentra en dentro de columnas_esperadas.
OK -> La columna 'dia' se encuentra en dentro de columnas_esperadas.
OK -> La columna 'estado_asistencia' se encuentra en dentro de columnas_esperadas.
OK -> La columna 'numero_alumnos' se encuentra en dentro de columnas_esperadas.
OK -> No hay duplicados.
