# Filtrado de tablas para obtener información de actividades solo de la asignatura IP


El objetivo de este script es el de filtrar las tablas que recogen información de las actividades llevadas a cabo por los usuarios para que alberguen información relativa solo al curso de ip, y así evitar tener que hacer un join para concatenar usuario con la tarea/foro/cuestionario y otro para unir estas tuplas con aquellas actividades pertenecientes a un curso. 

Además, una vez hecho esto, podemos concatenar en algunos casos información de dos tablas en una sola para reducir la complejidad de las consultas y mejorar la eficiencia de cómputo.

## Configuración 

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import os

# Configuración
curso_ip = 8683
ruta_origen = "/home/carlos/Documentos/TFG/spark-workspace/data/raw"
ruta_destino = "/home/carlos/Documentos/TFG/spark-workspace/data/raw/ip"
os.makedirs(ruta_destino, exist_ok=True)

# Crear sesión Spark
spark = SparkSession.builder \
    .appName("Filtrado datos curso IP") \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/05/26 17:29:04 WARN Utils: Your hostname, carlos-Modern-15-A11SB, resolves to a loopback address: 127.0.1.1; using 158.49.195.162 instead (on interface wlo1)
25/05/26 17:29:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/26 17:29:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Filtrado de assignments y concatenación de entregas y notas

Tras realizar esta operación, nos quedaremos con una única tabla  `assign_submission_grade_cmi` que va a tener información de  todas las tareas entregadas por los estudiantes de ip, junto con la calificación que obtuvieron en ellas.

In [None]:
# Leer datos de asignaciones y envíos
assign = spark.read.parquet(f"{ruta_origen}/assign_cmi.parquet")
submissions = spark.read.parquet(f"{ruta_origen}/assign_submission_cmi.parquet")

# Filtrar tabla para quedarnos solo con las tareas de ip , y quedarnos solo con los campos necesarios
assign_ip = assign.filter(col("course") == curso_ip).select(
    "id", "duedate", "allowsubmissionsfromdate", "name"
)
# Obtener entregas de tareas del curso IP
submissions_ip = (
    submissions.join(assign_ip, submissions.assignment == assign_ip.id, "inner")
    .withColumnRenamed("timemodified", "timesubmitted")
    .drop("id")
)

# Concatenar submissions con grades
grades = spark.read.parquet(f"{ruta_origen}/assign_grades_cmi.parquet")
grades_filtered = grades.select("userid", "assignment", "grade")

assign_submission_grade_cmi = submissions_ip.join(grades_filtered, on=["userid", "assignment"], how="left")

# Escribir el dataframe a parquet para dejarlo listo para métricas
if not os.path.exists(f"{ruta_destino}/assign_submission_cmi.parquet"):
    assign_submission_grade_cmi.write.mode("overwrite").parquet(f"{ruta_destino}/assign_submission_grade_cmi.parquet")
    print("assign_submission_grade_cmi.parquet creado :)")
    

assign_submission_grade_cmi.parquet creado :)


## Filtrado de mensajes en los foros

El resultado de este filtrado será obtener una única tabla `forum_post_cmi` en la que cada tupla tendrá la información de cada mensaje que ha publicado un alumno en cualquiera de los foros de la asignatura ip

In [20]:

# Cargar datos
forum = spark.read.parquet(f"{ruta_origen}/forum_cmi.parquet")
discussions = spark.read.parquet(f"{ruta_origen}/forum_discussions_cmi.parquet")
posts = spark.read.parquet(f"{ruta_origen}/forum_posts_cmi.parquet")



# Filtrar foros del curso IP
forum_ip = forum.filter(col("course") == curso_ip).select("id")  # id = forum_id

# Unir discussions con forum_ip para quedarnos con discusiones de IP
discussions_ip = discussions.join(forum_ip, discussions.forum == forum_ip.id, "inner") \
                            .select(discussions["id"].alias("discussion_id"))

# Unir posts con discussions_ip para quedarnos solo con los posts válidos
posts_ip = posts.join(discussions_ip, posts.discussion == discussions_ip.discussion_id, "inner") \
                .select("userid", "discussion", "created")
                
# Guardar parquet
posts_ip.write.mode("overwrite").parquet(f"{ruta_destino}/forum_posts_ip.parquet")
print(" forum_posts_ip.parquet creado correctamente.")

 forum_posts_ip.parquet creado correctamente.


## Filtrado de cuestionarios

In [21]:

# Cargar datos
quizes = spark.read.parquet(f"{ruta_origen}/quiz_cmi.parquet")
attempts = spark.read.parquet(f"{ruta_origen}/quiz_attempts_cmi.parquet")

quizes.printSchema()
print("========================================")
attempts.printSchema()


root
 |-- id: long (nullable = true)
 |-- course: long (nullable = true)
 |-- timeopen: long (nullable = true)
 |-- timeclose: long (nullable = true)
 |-- name: string (nullable = true)
 |-- timemodified: long (nullable = true)

root
 |-- quiz: long (nullable = true)
 |-- userid: string (nullable = true)
 |-- state: string (nullable = true)
 |-- attempt: long (nullable = true)
 |-- sumgrades: string (nullable = true)
 |-- timestart: long (nullable = true)
 |-- timefinish: long (nullable = true)

