INTEGRANTES:


In [None]:
!pip install pyspark

# Set up

We will import the files needed for the exercises as follows:

In [None]:
from google.colab import drive

# Gain access to the source files in GDrive
drive.mount('/content/drive')

In [None]:
import os

# Set Data Path
dataPath = "/content/drive/MyDrive/Data/2425Q1-DBDP1"
# Show Data Folders
os.listdir(dataPath)

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

if not 'spark' in globals():
  # Create the configuration
  #   Replace the "*" with the number of parallel processors your job requires (Google Collab offers maximum 2)
  conf = SparkConf() \
      .set("spark.master", "local[*]") \
      .set("spark.app.name", "Analysis of video visualizations")
  spark = SparkSession.builder.config(conf=conf).getOrCreate()
else:
  print("Spark session already exists!!!")

## Constants Definition

In [None]:
courseName="Disseny de Bases de Dades"
firstClassDate="2024-09-13 14:00:00"
examDate="2024-11-13 12:00:00"
lastSessionDate="2024-10-23"
plannedExamDate="2024-11-06"
sessionDay="Wed"
beforeExamMove=7
irrelevantUsers=[]
irrelevantSlides=[]

# Data Preparation

## Clean Logs

In [None]:
from pyspark.sql.functions import try_to_timestamp, lit

# Obtain classes list
classes = spark.read.csv(dataPath+"/InClass.csv", header='true', inferSchema='true', sep=';') \
                        .withColumn("Start", try_to_timestamp("Start", lit('d/MM/yy, HH:mm'))) \
                        .withColumn("End", try_to_timestamp("End", lit('d/MM/yy, HH:mm'))) \
                        .cache()
print("Classes loaded: "+str(classes.count()))
classes.show(n=5, truncate=False)

In [None]:
from pyspark.sql.functions import try_to_timestamp, trim, lit, dayofweek, hour

# Obtain Moodle logs dataset
# Trim user name and event names (not really necessary)
# Transform the timestamp into the right data type (to deal with days of one digit, only one "d" must be provided in the format)
# Take the right time interval of the log
load = spark.read.csv(dataPath+"/logs.csv", header='true', inferSchema='true', sep=',') \
      .drop("Affected user", "IP address", "Origin", "Description", "Event name") \
      .toDF(*["Timestamp", "Username", "Event", "Component"]) \
      .withColumn("Username", trim("Username")) \
      .withColumn("Event", trim("Event")) \
      .withColumn("Timestamp", try_to_timestamp("Timestamp", lit('d/MM/yy, HH:mm'))) \
      .where("Timestamp>'"+firstClassDate+"'") \
      .where("Timestamp<'"+examDate+"'") \
      .where("Username NOT IN ('"+("','".join(irrelevantUsers))+"')") \
      .withColumn("Weekday", dayofweek("Timestamp")) \
      .withColumn("Hour", hour("Timestamp")) \
      .cache()
print("Logs loaded: ", load.count())
load.show(5)

In [None]:
# Remove log entries during classes
logs = load.join(classes, [classes.Start<=load.Timestamp, load.Timestamp<=classes.End], "leftanti").cache()
print("Logs excluding classes: ", logs.count())
logs.show(5)

### Logins

In [None]:
from pyspark.sql.functions import substr, lit

#Remove irrelevant rows and columns
reducedLoginLogs = logs \
                  .where("Component='System'") \
                  .drop("Component") \
                  .withColumn("Event", substr("Event", lit(9))) \
                  .where("Event='"+courseName+"'") \
                  .drop("Event") \
                  .cache()
print("Reduced login logs: ", reducedLoginLogs.count())
reducedLoginLogs.show(n=5, truncate=False)

In [None]:
from pyspark.sql import Row

# 1=Sunday and the class was on Wedday
daysMap={'1': "Clase+4", '2': "Clase+5", '3': "Clase+6", '4': "Clase", '5': "Clase+1", '6': "Clase+2", '7': "Clase+3"}

loginLogsPerWeekday = reducedLoginLogs.rdd.map(lambda x: Row(Weekday=daysMap[str(x.Weekday)])).toDF() \
                                    .groupBy("Weekday").count() \
                                    .sort("Weekday").cache()
loginLogsPerWeekday.toPandas().to_csv(dataPath+'/loginLogsPerWeekday.csv')
loginLogsPerWeekday.show()

In [None]:
loginLogsPerHour = reducedLoginLogs.groupBy("Hour").count().sort("Hour").cache()
loginLogsPerHour.toPandas().to_csv(dataPath+'/loginLogsPerHour.csv')
loginLogsPerHour.show()

In [None]:
# Remove first irrelevan characters of the Event name
# Take only the current course
# Take only one per different day
from pyspark.sql.functions import to_date

cleanLoginLogs = reducedLoginLogs \
                        .withColumn("Timestamp", to_date("Timestamp")) \
                        .distinct() \
                        .cache()

print("Clean login logs: ", cleanLoginLogs.count())
cleanLoginLogs.show(n=5, truncate=False)

In [None]:
# Summarize video logs cardinalities
print("Total logs: "+str(logs.count()))
print("Total login logs after reduction: "+str(reducedLoginLogs.count())) # This should be smaller, because removes many other log events
print("Total login logs after cleaning: "+str(cleanLoginLogs.count())) # This should be smaller, because groups many dates

In [None]:
# Release memory
reducedLoginLogs.unpersist()
globals().pop('reducedLoginLogs')
print("Variables removed!!!")

### Video

In [None]:
#Remove irrelevant rows and columns
reducedVideoLogs = logs \
                  .where("Component='URL'") \
                  .drop("Component") \
                  .cache()
print("Reduced video logs: ", reducedVideoLogs.count())
reducedVideoLogs.show(n=5, truncate=False)

In [None]:
from pyspark.sql import Row

# 1=Sunday and the class was on Wedday
daysMap={'1': "Clase+4", '2': "Clase+5", '3': "Clase+6", '4': "Clase", '5': "Clase+1", '6': "Clase+2", '7': "Clase+3"}
videoLogsPerWeekday = reducedVideoLogs.rdd.map(lambda x: Row(Weekday=daysMap[str(x.Weekday)])).toDF() \
                            .groupBy("Weekday").count() \
                            .sort("Weekday").cache()
videoLogsPerWeekday.toPandas().to_csv(dataPath+'/videoLogsPerWeekday.csv')
videoLogsPerWeekday.show()

In [None]:
videoLogsPerHour = reducedVideoLogs.groupBy("Hour").count().sort("Hour").cache()
videoLogsPerHour.toPandas().to_csv(dataPath+'/videoLogsPerHour.csv')
videoLogsPerHour.show()

In [None]:
# Remove first irrelevan characters of the Event name
# Take only the first visualization of each video
from pyspark.sql.functions import substr, lit, min, count

cleanVideoLogs = reducedVideoLogs \
                        .withColumn("Event", substr("Event", lit(13))) \
                        .groupBy("Username","Event").agg(min("Timestamp").alias("Timestamp"), count("*").alias("VideoRepetitions")) \
                        .cache()
print("Clean video logs: ", cleanVideoLogs.count())
cleanVideoLogs.show(n=5, truncate=False)

In [None]:
# Summarize video logs cardinalities
print("Total logs: "+str(logs.count()))
print("Total video logs after reduction: "+str(reducedVideoLogs.count())) # This should be smaller, because removes many other log events
print("Total video logs after cleaning: "+str(cleanVideoLogs.count())) # This should be smaller, because groups many dates

In [None]:
# Release memory
reducedVideoLogs.unpersist()
globals().pop('reducedVideoLogs')
print("Variables removed!!!")

### Slide

In [None]:
#Remove irrelevant rows and columns
reducedSlideLogs = logs \
                  .where("Component='File'") \
                  .drop("Component") \
                  .cache()
print("Reduced slide logs: ", reducedSlideLogs.count())
reducedSlideLogs.show(n=5, truncate=False)

In [None]:
from pyspark.sql import Row

# 1=Sunday and the class was on Wedday
daysMap={'1': "Clase+4", '2': "Clase+5", '3': "Clase+6", '4': "Clase", '5': "Clase+1", '6': "Clase+2", '7': "Clase+3"}

slideLogsPerWeekday = reducedSlideLogs.rdd.map(lambda x: Row(Weekday=daysMap[str(x.Weekday)])).toDF() \
                                    .groupBy("Weekday").count() \
                                    .sort("Weekday").cache()
slideLogsPerWeekday.toPandas().to_csv(dataPath+'/slideLogsPerWeekday.csv')
slideLogsPerWeekday.show()

In [None]:
slideLogsPerHour = reducedSlideLogs.groupBy("Hour").count().sort("Hour").cache()
slideLogsPerHour.toPandas().to_csv(dataPath+'/slideLogsPerHour.csv')
slideLogsPerHour.show()

In [None]:
# Remove first irrelevan characters of the Event name
# Take only the first visualization of each video
from pyspark.sql.functions import startswith, substr, lit, min, count

cleanSlideLogs = reducedSlideLogs \
                        .where(startswith("Event", lit("File: Slides: "))) \
                        .withColumn("Event", substr("Event", lit(15))) \
                        .where("Event NOT IN ('"+("','".join(irrelevantSlides))+"')") \
                        .groupBy("Username","Event").agg(min("Timestamp").alias("Timestamp"), count("*").alias("SlideRepetitions")) \
                        .cache()

print("Clean slide logs: ", cleanSlideLogs.count())
cleanSlideLogs.show(n=5, truncate=False)

In [None]:
# Summarize logs cardinalities
print("Total logs: "+str(logs.count()))
print("Total slide logs after reduction: "+str(reducedSlideLogs.count())) # This should be smaller, because removes many other log events
print("Total slide logs after cleaning: "+str(cleanSlideLogs.count())) # This should be smaller, because removes irrelevant slides and groups many dates

In [None]:
# Release memory
logs.unpersist()
reducedSlideLogs.unpersist()
# Remove variables to avoid unwillingly accessing them
globals().pop('logs')
globals().pop('reducedSlideLogs')
print("Variables removed!!!")

## Generate Users

In [None]:
userList = cleanVideoLogs.select("Username") \
                      .unionAll(cleanSlideLogs.select("Username")) \
                      .distinct() \
                      .sort("Username") \
                      .coalesce(1) \
                      .cache()
print("Total users: ", userList.count())
userList.show(n=5, truncate=False)
userList.write.csv(path=dataPath+"/Usernames", mode="overwrite", header=True)
print("CSV written!!!")
userList.unpersist()
globals().pop('userList')
print("Variables removed!!!")

## Generate Elements List

### Video

In [None]:
videoList = cleanVideoLogs.select("Event") \
                      .distinct() \
                      .coalesce(1) \
                      .cache()
print("Total de videos: ", videoList.count())
videoList.show(n=5, truncate=False)
videoList.write.csv(path=dataPath+"/Videos", mode="overwrite", header=True)
print("CSV written!!!")
videoList.unpersist()
globals().pop('videoList')
print("Variables removed!!!")

### Slide

In [None]:
slideList = cleanSlideLogs.select("Event") \
                      .distinct() \
                      .coalesce(1) \
                      .cache()
print("Total de slides: ", slideList.count())
slideList.show(n=15, truncate=False)
slideList.write.csv(path=dataPath+"/Slides", mode="overwrite", header=True)
print("CSV written!!!")
slideList.unpersist()
globals().pop('slideList')
print("Variables removed!!!")

## Join Sessions and Students

In [None]:
# Obtain sessions list
sessions = spark.read.csv(dataPath+"/Sessions.csv", header='true', inferSchema='true', sep='\t') \
                        .withColumn("SessionTimestamp", try_to_timestamp("SessionTimestamp", lit('d/MM/yy, HH:mm'))) \
                        .cache()
print("Sessions loaded: "+str(sessions.count()))
sessions.show(n=3, truncate=False)

### Video

In [None]:
# Obtain videos per session list
videos = spark.read.csv(dataPath+"/VideosPerSession.csv", header='true', inferSchema='true', sep=';') \
                        .cache()

print("Videos per session loaded: "+str(videos.count()))
videos.show(n=3, truncate=False)

In [None]:
# Enrich the videos with the session information
sessionCounter=videos \
                    .withColumnRenamed("Session", "SessionID") \
                    .groupBy("SessionID") \
                    .count() \
                    .withColumnRenamed("count", "TotalVideosInSession")
videosEnriched = videos \
              .join(sessionCounter, videos.Session==sessionCounter.SessionID) \
              .join(sessions, videos.Session==sessions.SessionID) \
              .drop(*['SessionID', 'SessionName']) \
              .withColumnRenamed("Event", "EventID") \
              .cache()

print("Videos enriched: "+str(videosEnriched.count()))
videosEnriched.show(3)

In [None]:
from pyspark.sql.functions import concat_ws

# Obtain students list except NP
students = spark.read.csv(dataPath+"/StudentsNotNP.csv", header='true', inferSchema='true', sep=';') \
                        .withColumn("UsernameID", concat_ws(" ","First name", "Surname")) \
                        .drop(*["ID number","First name", "Surname","Institution","Department","Email address","Last downloaded from this course"]) \
                        .cache()

print("Students loaded: "+str(students.count()))
students.show(truncate=False)

In [None]:
from pyspark.sql.functions import col

# Join events and videos for all students
fullVideos = students.crossJoin(videosEnriched) \
                      .join(cleanVideoLogs, [col("EventID")==col("Event"), col("UsernameID")==col("Username")], 'left_outer') \
                      .select("EventID", "UsernameID", "Session", "TotalVideosInSession", "SessionTimestamp", "Timestamp", "VideoRepetitions") \
                      .fillna(0,"VideoRepetitions") \
                      .cache()
print("Total videos times students: "+str(fullVideos.count()))
fullVideos.show(truncate=False)

In [None]:
# Summarize logs cardinalities
print("Total video logs after cleaning: "+str(cleanVideoLogs.count()))
print("Total logs after enrichment: "+str(fullVideos.count())) # This should be larger, because takes all combinations of videos and students

### Slide

In [None]:
# Obtain sessions list
slidesPerSession = spark.read.csv(dataPath+"/SlidesPerSession.csv", header='true', inferSchema='true', sep=';') \
                        .cache()
print("Slides loaded: "+str(sessions.count()))
slidesPerSession.show(n=6, truncate=False)

In [None]:
# Enrich the slides with the session information
fullSlides = slidesPerSession.crossJoin(students.select("UsernameID")) \
              .join(cleanSlideLogs, [col("Slide")==col("Event"), col("UsernameID")==col("Username")], 'left_outer') \
              .join(sessions, col("Session")==col("SessionID")) \
              .drop("Event", "Slide", "Username", "SessionID", "SessionName") \
              .withColumnRenamed("UsernameId", "Username") \
              .fillna(0,"SlideRepetitions") \
              .cache()

print("Full slides: "+str(fullSlides.count()))
fullSlides.show(n=20, truncate=False)

In [None]:
# Release memory
sessions.unpersist()
videos.unpersist()
slidesPerSession.unpersist()
videosEnriched.unpersist()
globals().pop('sessions')
globals().pop('videos')
globals().pop('slidesPerSession')
globals().pop('videosEnriched')
print("Variables removed!!!")

## Feature Engineering

### Login

In [None]:
from pyspark.sql.functions import date_add, when, col, sum

# Mark with NULL those visualizations not one weeek before the exam
loginsAggregatedPerStudent = cleanLoginLogs \
                            .withColumn("ExamMinusWeek", date_add(try_to_timestamp(lit(examDate), lit('yyyy-MM-dd HH:mm:ss')), lit(beforeExamMove))) \
                            .withColumn("DuringSessionsLogin", when(col("Timestamp")<col("ExamMinusWeek"), 1).otherwise(0)) \
                            .withColumn("BeforeExamLogin", when(col("Timestamp")<col("ExamMinusWeek"), 0).otherwise(1)) \
                            .groupBy("Username") \
                            .agg(
                                sum("DuringSessionsLogin").alias("DuringSessionsLoginCounter"),
                                sum("BeforeExamLogin").alias("BeforeExamLoginCounter")
                                ) \
                            .cache()

print("Total login logs: "+str(cleanLoginLogs.count()))
print("Logins per student: "+str(loginsAggregatedPerStudent.count()))
loginsAggregatedPerStudent.show(61)

### Video

#### Before Session

In [None]:
from pyspark.sql.functions import when

# Mark with NULL those visualizations not before the session
beforeSessionVideoLogs = fullVideos \
                            .withColumn("Timestamp", when(col("Timestamp")>=col("SessionTimestamp"), None).otherwise(col("Timestamp"))) \
                            .cache()

print("Total video logs: "+str(fullVideos.count()))
print("Video logs before session: "+str(beforeSessionVideoLogs.count()))
beforeSessionVideoLogs.show(truncate=False)

In [None]:
from pyspark.sql import functions as sf

# Compute the percentage of videos visualized before the session
beforeSessionVideosAggregatedPerSession = beforeSessionVideoLogs \
                                          .groupBy("UsernameID", "Session") \
                                          .agg( sf.count_distinct("Timestamp").alias("VideosInSessionCounter"), \
                                              sf.any_value("TotalVideosInSession").alias("TotalVideosInSession"), \
                                              sf.sum("VideoRepetitions").alias("VideoRepetitionsCounter")
                                              ) \
                                          .withColumn("BeforeSessionVideoPercent", sf.col("VideosInSessionCounter")/sf.col("TotalVideosInSession")) \
                                          .withColumn("AvgVideoRepetitions", sf.col("VideoRepetitionsCounter")/sf.col("TotalVideosInSession")) \
                                          .drop("VideoRepetitionsCounter") \
                                          .cache()
print("Video logs aggregated per session: "+str(beforeSessionVideosAggregatedPerSession.count())) # This should be students*sessions
beforeSessionVideosAggregatedPerSession.show(truncate=False)

#### One Week Before Exam

In [None]:
from pyspark.sql.functions import date_add

# Mark with NULL those visualizations not one weeek before the exam
beforeExamMinusWeekVideoLogs = fullVideos \
                            .withColumn("ExamMinusWeek", date_add(try_to_timestamp(lit(examDate), lit('yyyy-MM-dd HH:mm:ss')), lit(beforeExamMove))) \
                            .withColumn("Timestamp", when(col("Timestamp")>=col("ExamMinusWeek"), None).otherwise(col("Timestamp"))) \
                            .drop("ExamMinusWeek") \
                            .cache()

print("Total video logs: "+str(fullVideos.count()))
print("Video logs one week before exam: "+str(beforeExamMinusWeekVideoLogs.count()))
beforeExamMinusWeekVideoLogs.show()

In [None]:
from pyspark.sql import functions as sf

# Compute the percentage of videos visualized one week before the exam
beforeExamMinusWeekVideosAggregatedPerSession = beforeExamMinusWeekVideoLogs \
                                          .groupBy("UsernameID", "Session") \
                                          .agg( sf.count_distinct("Timestamp").alias("VideosInSessionCounter"), \
                                              sf.any_value("TotalVideosInSession").alias("TotalVideosInSession")\
                                              ) \
                                          .withColumn("BeforeExamMinusWeekVideoPercent", sf.col("VideosInSessionCounter")/sf.col("TotalVideosInSession"))
print("Video logs aggregated per session: "+str(beforeExamMinusWeekVideosAggregatedPerSession.count())) # This should be students*sessions
beforeExamMinusWeekVideosAggregatedPerSession.show()


#### Before Exam

In [None]:
from pyspark.sql.functions import date_add

# Mark with NULL those visualizations not before the session
beforeExamVideoLogs = fullVideos \
                            .withColumn("Exam", try_to_timestamp(lit(examDate), lit('yyyy-MM-dd HH:mm:ss'))) \
                            .withColumn("Timestamp", when(col("Timestamp")>=col("Exam"), None).otherwise(col("Timestamp"))) \
                            .drop("Exam") \
                            .cache()

print("Total video logs: "+str(fullVideos.count()))
print("Video logs before exam: "+str(beforeExamVideoLogs.count()))
beforeExamVideoLogs.show()


In [None]:
from pyspark.sql import functions as sf

# Compute the percentage of videos visualized one week before the exam
beforeExamVideosAggregatedPerSession = beforeExamVideoLogs \
                                          .groupBy("UsernameID", "Session") \
                                          .agg( sf.count_distinct("Timestamp").alias("VideosInSessionCounter"), \
                                              sf.any_value("TotalVideosInSession").alias("TotalVideosInSession"), \
                                              sf.sum("VideoRepetitions").alias("VideoRepetitionsCounter")
                                              ) \
                                          .withColumn("BeforeExamVideoPercent", sf.col("VideosInSessionCounter")/sf.col("TotalVideosInSession")) \
                                          .withColumn("AvgViewedVideoRepetitions", sf.col("VideoRepetitionsCounter")/sf.col("VideosInSessionCounter")) \
                                          .drop("VideoRepetitionsCounter") \
                                          .cache()
print("Video logs aggregated per session: "+str(beforeExamVideosAggregatedPerSession.count())) # This should be students*sessions
beforeExamVideosAggregatedPerSession.show()
                                          #.fillna(0,"AvgViewedVideoRepetitions") \

### Slide

In [None]:
from pyspark.sql.functions import when

# Mark with NULL those visualizations not before the session
slideAggregates = fullSlides \
                            .withColumn("BeforeSessionSlidePercent", when(col("Timestamp")<col("SessionTimestamp"), 1).otherwise(0)) \
                            .withColumn("ExamMinusWeek", date_add(try_to_timestamp(lit(examDate), lit('yyyy-MM-dd HH:mm:ss')), lit(beforeExamMove))) \
                            .withColumn("BeforeExamMinusWeekSlidePercent", when(col("Timestamp")<col("ExamMinusWeek"), 1).otherwise(0)) \
                            .withColumn("Exam", try_to_timestamp(lit(examDate), lit('yyyy-MM-dd HH:mm:ss'))) \
                            .withColumn("BeforeExamSlidePercent", when(col("Timestamp")<col("Exam"), 1).otherwise(0)) \
                            .drop("Exam","ExamMinusWeek","Timestamp","SessionTimestamp") \
                            .cache()

print("Total slide logs: "+str(fullSlides.count()))
print("Slide logs before session: "+str(slideAggregates.count()))
slideAggregates.show(truncate=False)

### Aggregate per User

In [None]:
from pyspark.sql import functions as sf

aggregatedPerUser = beforeSessionVideosAggregatedPerSession \
                                          .join(beforeExamVideosAggregatedPerSession \
                                                .select(*["UsernameID", "Session", "BeforeExamVideoPercent", "AvgViewedVideoRepetitions"]) \
                                                .withColumnRenamed("UsernameID", "JoinAttr1") \
                                                .withColumnRenamed("Session", "JoinAttr2"), [col("UsernameID")==col("JoinAttr1"), col("Session")==col("JoinAttr2")], 'inner') \
                                          .drop(*["JoinAttr1", "JoinAttr2"]) \
                                          .join(beforeExamMinusWeekVideosAggregatedPerSession \
                                                .select(*["UsernameID", "Session", "BeforeExamMinusWeekVideoPercent"]) \
                                                .withColumnRenamed("UsernameID", "JoinAttr1") \
                                                .withColumnRenamed("Session", "JoinAttr2"), [col("UsernameID")==col("JoinAttr1"), col("Session")==col("JoinAttr2")], 'inner') \
                                          .drop(*["JoinAttr1", "JoinAttr2"]) \
                                          .join(slideAggregates \
                                                .withColumnRenamed("Username", "JoinAttr1") \
                                                .withColumnRenamed("Session", "JoinAttr2"), [col("UsernameID")==col("JoinAttr1"), col("Session")==col("JoinAttr2")], 'inner') \
                                          .drop(*["JoinAttr1", "JoinAttr2"]) \
                                          .groupBy("UsernameID") \
                                          .agg( \
                                              sf.avg("BeforeSessionVideoPercent").alias("BeforeSessionVideoPercent"), \
                                              sf.avg("BeforeExamVideoPercent").alias("BeforeExamVideoPercent"), \
                                              sf.avg("BeforeExamMinusWeekVideoPercent").alias("BeforeExamMinusWeekVideoPercent"), \
                                              sf.avg("AvgVideoRepetitions").alias("AvgVideoRepetitions"), \
                                              sf.avg("AvgViewedVideoRepetitions").alias("AvgViewedVideoRepetitions"), \
                                              sf.avg("BeforeSessionSlidePercent").alias("BeforeSessionSlidePercent"), \
                                              sf.avg("BeforeExamSlidePercent").alias("BeforeExamSlidePercent"), \
                                              sf.avg("BeforeExamMinusWeekSlidePercent").alias("BeforeExamMinusWeekSlidePercent"), \
                                              sf.avg("SlideRepetitions").alias("AvgSlideRepetitions")
                                              ) \
                                          .join(loginsAggregatedPerStudent.withColumnRenamed("Username", "JoinAttr"), [col("UsernameID")==col("JoinAttr")], 'left') \
                                          .drop("JoinAttr") \
                                          .join(students.withColumnRenamed("UsernameID", "JoinAttr"), [col("UsernameID")==col("JoinAttr")], 'inner') \
                                          .drop("JoinAttr") \
                                          .cache()



print("All together per user: "+str(aggregatedPerUser.count()))
aggregatedPerUser.sort("BeforeSessionVideoPercent").show(n=20, truncate=False)

In [None]:
# Release memory
beforeSessionVideoLogs.unpersist()
beforeExamMinusWeekVideoLogs.unpersist()
beforeExamVideosAggregatedPerSession.unpersist()
slideAggregates.unpersist()
students.unpersist()
loginsAggregatedPerStudent.unpersist()
globals().pop('beforeSessionVideoLogs')
globals().pop('beforeExamMinusWeekVideoLogs')
globals().pop('beforeExamVideosAggregatedPerSession')
globals().pop('slideAggregates')
globals().pop('students')
globals().pop('loginsAggregatedPerStudent')
print("Variables removed!!!")

### Aggregate per Week

In [None]:
from pyspark.sql.functions import next_day, col

LoginWeeklySerie = cleanLoginLogs \
                      .withColumn("Week", next_day(col("Timestamp"), sessionDay)) \
                      .groupBy("Week") \
                      .count() \
                      .withColumnRenamed("count", "LoginCount")
LoginWeeklySerie.show()

In [None]:
from pyspark.sql.functions import next_day, col, sum

VideoWeeklySerie = cleanVideoLogs \
                      .withColumn("Week", next_day(col("Timestamp"), sessionDay)) \
                      .groupBy("Week") \
                      .agg(sum("VideoRepetitions").alias("VideoCount"))
VideoWeeklySerie.show()

In [None]:
from pyspark.sql.functions import next_day, col, sum

SlideWeeklySerie = cleanSlideLogs \
                      .withColumn("Week", next_day(col("Timestamp"), sessionDay)) \
                      .groupBy("Week") \
                      .agg(sum("SlideRepetitions").alias("SlideCount"))
SlideWeeklySerie.show()

In [None]:
aggregatedPerWeek = LoginWeeklySerie \
                      .join(VideoWeeklySerie \
                            .withColumnRenamed("Week", "JoinAttr"), [col("Week")==col("JoinAttr")], 'inner') \
                      .drop("JoinAttr") \
                      .join(SlideWeeklySerie \
                            .withColumnRenamed("Week", "JoinAttr"), [col("Week")==col("JoinAttr")], 'inner') \
                      .drop("JoinAttr") \
                      .sort("Week")
aggregatedPerWeek.show()

# Visualizations

In [None]:
# Set Data Path
chartsPath = "Charts/2425Q1-DBDP1"
# Show Data Folders
os.listdir(chartsPath)

In [None]:
# Move main aggregated dataframe from Spark to Pandas
df = aggregatedPerUser.toPandas().set_index(['Gender','UsernameID'])
predictors=list(set(df.columns.tolist())-set(['TestMark','ExamMark','ExercisesMark']))
df.to_csv(dataPath+'/pd_AnyTime.csv')
df.iloc[0:5]

In [None]:
# Move timeseries from Spark to Pandas
ts = aggregatedPerWeek.toPandas().set_index('Week')
ts

## Prechecks

In [None]:
# People having a general average of video repetition greater than the average considering only those seen at least once (This should be empty!!!)
df[df['AvgVideoRepetitions']>df['AvgViewedVideoRepetitions']]

In [None]:
# People visualizing videos one week before the exam
df[df['BeforeExamMinusWeekVideoPercent']!=df['BeforeExamVideoPercent']]

In [None]:
# People reading slides one week before the exam
df[df['BeforeExamMinusWeekSlidePercent']!=df['BeforeExamSlidePercent']]

## Univariate analysis

In [None]:
df.describe()

## Bivariate analysis

In [None]:
import seaborn as sns

# Check distributions between test questions and exercises in the exam
ax = sns.jointplot(y='TestMark',x='ExercisesMark',data=df, height=3)
ax.savefig(chartsPath+"/JointGrid-Text_x_Exercises.pdf", format="pdf", bbox_inches='tight')

### Time series

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

print("Exam date:", examDate.split()[0])
for counter in ts.columns:
  fig = plt.figure()
  ax=sns.barplot(data=ts, x=counter, y="Week", orient="h")
  plt.axhline(lastSessionDate, color="g", linestyle=":");
  plt.axhline(plannedExamDate, color="r", linestyle="--")
  plt.axhline(examDate.split()[0], color="r", linestyle="-")
  ax.set(ylabel="Semana")
  ax.set(xlabel="Número de accesos")
  plt.savefig(chartsPath+"/TimeSerie-"+counter+".pdf", format="pdf", bbox_inches='tight')
  ax.set(xlabel=counter)
  #lt.title(counter)
  plt.show()

## Clustering

### Agglomerative

In [None]:
# Cluster the marks of students
import matplotlib.pyplot as plt
from sklearn.cluster import AgglomerativeClustering

X = df[['ExercisesMark','TestMark']]
clustering = AgglomerativeClustering(n_clusters=4).fit(X)
clusters = clustering.fit_predict(X)

fig, ax = plt.subplots(figsize=(5, 5))
ax.set_title("Agglomerative clustering of marks")
ax.set_xlabel("ExercisesMark")
ax.set_ylabel("TestMark")
scatter = plt.scatter(X['ExercisesMark'], X['TestMark'], c=clusters)
plt.legend(handles=scatter.legend_elements()[0], labels=scatter.legend_elements()[1], title="Labels")
plt.show()

In [None]:
# Replace marks by labels
dfLabeled = df[predictors]
dfLabeled['LabelAgglomerative'] = clusters.tolist()
dfLabeled['LabelAgglomerative'].replace({0: "GoodTest", 1:"Bad", 2: "Middle", 3:"Good"},inplace=True)
dfLabeled

### K-Means

In [None]:
# Cluster the marks of students
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans

X = df[['ExercisesMark','TestMark']]
clustering = KMeans(n_clusters=3).fit(X)
clusters = clustering.fit_predict(X)

fig, ax = plt.subplots(figsize=(5, 5))
ax.set_title("K-Means clustering of marks")
ax.set_xlabel("ExercisesMark")
ax.set_ylabel("TestMark")
scatter = plt.scatter(X['ExercisesMark'], X['TestMark'], c=clusters)
plt.legend(handles=scatter.legend_elements()[0], labels=scatter.legend_elements()[1], title="Labels")
plt.show()

In [None]:
dfLabeled['LabelKMeans'] = clusters.tolist()
dfLabeled['LabelKMeans'].replace({0: "Good", 1:"Bad", 2: "GoodTest"}, inplace=True)
dfLabeled