In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .appName("OULAD Analys;is") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

25/02/12 10:19:04 WARN Utils: Your hostname, alehak-ThinkPad-S5-S540 resolves to a loopback address: 127.0.1.1; using 10.1.0.118 instead (on interface enp3s0)
25/02/12 10:19:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/12 10:19:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Read csv files into spark

In [3]:
data_courses = spark.read.csv("archive/courses.csv", header=True, inferSchema=True)
data_assessments = spark.read.csv("archive/assessments.csv", header=True, inferSchema=True)
data_studentAssessment = spark.read.csv("archive/studentAssessment.csv", header=True, inferSchema=True)
data_studentInfo = spark.read.csv("archive/studentInfo.csv", header=True, inferSchema=True)
data_studentRegistration = spark.read.csv("archive/studentRegistration.csv", header=True, inferSchema=True)
data_studentVle = spark.read.csv("archive/studentVle.csv", header=True, inferSchema=True)

                                                                                

In [4]:
data_courses.createOrReplaceTempView("data_courses")
data_assessments.createOrReplaceTempView("data_assessments")
data_studentAssessment.createOrReplaceTempView("data_studentAssessment")
data_studentInfo.createOrReplaceTempView("data_studentInfo")
data_studentRegistration.createOrReplaceTempView("data_studentRegistration")
data_studentVle.createOrReplaceTempView("data_studentVle")

In [5]:
from pyspark.sql import functions as F

In [6]:
data_studentInfo.count()

                                                                                

32593

# Basic SQL Questions

## 1) List all students in the dataset

In [260]:
data_studentInfo.select("id_student").distinct().count()

28785

## 3) Get all students who studied module ‘AAA’ in presentation ‘2014B’.

In [22]:
# spark syntax

In [261]:
data_studentInfo.select("id_student").where((data_studentInfo["code_module"] == "AAA")
                                            & (data_studentInfo["code_presentation"] == "2014B")).distinct().show()

+----------+
|id_student|
+----------+
+----------+



In [52]:
# sql syntax

In [30]:
# register df as temp view
data_studentInfo.createOrReplaceTempView("data_studentInfo")

In [263]:
res_df = spark.sql("SELECT DISTINCT(id_student) FROM data_studentInfo WHERE code_module = 'AAA' and code_presentation='2014B'")
res_df.count()

0

## 5) Get all students who have withdrawn from their course.

In [46]:
# Students who unregistered have Withdrawn as the value of the final_result in the studentInfo table. 

In [47]:
# spark syntax

In [264]:
withdrawn_df = data_studentInfo.select("id_student").filter(
    data_studentInfo["final_result"] == "Withdrawn").distinct()

In [265]:
withdrawn_df.show()

+----------+
|id_student|
+----------+
|    302550|
|    534217|
|    579256|
|    597532|
|    631190|
|    514006|
|    650961|
|    686093|
|    530113|
|    574830|
|   1619251|
|    442823|
|    512861|
|    587195|
|    697283|
|    535220|
|    580595|
|    594599|
|   2336922|
|    431136|
+----------+
only showing top 20 rows



In [266]:
withdrawn_df.count()

9155

In [53]:
# sql syntax

In [267]:
res_df = spark.sql("SELECT DISTINCT(id_student) FROM data_studentInfo WHERE final_result = 'Withdrawn'")
res_df.count()

9155

## 7) List students who registered late (after course start date).

In [57]:
# use date_registration from studentRegistration

In [56]:
# spark syntax

In [270]:
late_students = data_studentRegistration.select("id_student", "code_module").filter(data_studentRegistration["date_registration"] > 0).distinct()

In [271]:
late_students.count()

233

In [62]:
# sql syntax

In [272]:
data_studentRegistration.createOrReplaceTempView("data_studentRegistration")

In [323]:
res_df = spark.sql("SELECT DISTINCT(id_student) FROM data_studentRegistration WHERE date_registration>0")

In [324]:
res_df.count()

233

In [275]:
res_df.count()

233

## 9) Find the average assessment weight by module.

In [80]:
from pyspark.sql import functions as F

In [79]:
# spark syntax

In [77]:
average_weight_df = data_assessments.groupBy("code_module").avg("weight")

In [78]:
average_weight_df.show()

+-----------+------------------+
|code_module|       avg(weight)|
+-----------+------------------+
|        CCC|              30.0|
|        BBB|19.047619047619047|
|        DDD|22.857142857142858|
|        FFF|15.384615384615385|
|        EEE|              40.0|
|        AAA|33.333333333333336|
|        GGG|              10.0|
+-----------+------------------+



In [82]:
average_weight_df = data_assessments.groupBy("code_module").agg(F.avg("weight").alias("average_weight"))

In [83]:
average_weight_df.show()

+-----------+------------------+
|code_module|    average_weight|
+-----------+------------------+
|        CCC|              30.0|
|        BBB|19.047619047619047|
|        DDD|22.857142857142858|
|        FFF|15.384615384615385|
|        EEE|              40.0|
|        AAA|33.333333333333336|
|        GGG|              10.0|
+-----------+------------------+



In [84]:
# sql syntax

In [85]:
data_assessments.createOrReplaceTempView("data_assessments")

In [89]:
res_df = spark.sql("SELECT code_module, AVG(weight) as avg_weigth FROM data_assessments GROUP BY code_module")

In [90]:
res_df.show()

+-----------+------------------+
|code_module|        avg_weigth|
+-----------+------------------+
|        CCC|              30.0|
|        BBB|19.047619047619047|
|        DDD|22.857142857142858|
|        FFF|15.384615384615385|
|        EEE|              40.0|
|        AAA|33.333333333333336|
|        GGG|              10.0|
+-----------+------------------+



## 11) Find students who have never submitted an assessment.

In [94]:
# spark syntax

In [277]:
noAssesments_student_df = data_studentInfo.join(data_studentAssessment, on=["id_student"], how="left_anti").select("id_student").distinct()

In [278]:
noAssesments_student_df.show()

+----------+
|id_student|
+----------+
|    302550|
|    534217|
|    579256|
|    631190|
|    514006|
|    621856|
|   1619251|
|    442823|
|    580595|
|    594599|
|    510532|
|    636590|
|    650829|
|    651481|
|    681960|
|   2355701|
|    566970|
|    595515|
|    247653|
|    699140|
+----------+
only showing top 20 rows



In [279]:
noAssesments_student_df.count()

                                                                                

5416

In [98]:
# sql syntax

In [280]:
data_studentAssessment.createOrReplaceTempView("data_studentAssessment")

In [281]:
# with anti join
res_df = spark.sql("SELECT DISTINCT(id_student) FROM data_studentInfo LEFT ANTI JOIN data_studentAssessment USING (id_student)")

In [282]:
res_df.count()

5416

In [283]:
# with not exists
res_df = spark.sql("""
            SELECT 
            DISTINCT(id_student) 
            FROM data_studentInfo d_si
            WHERE NOT EXISTS (
            SELECT 1 FROM data_studentAssessment d_sa WHERE d_sa.id_student = d_si.id_student)""")

In [284]:
res_df.count()

5416

# Intermediate SQL Questions

## 1) Get the average score per module.

In [285]:
# join assessments with studentRegistration, group by code_module

In [286]:
# spark syntax

In [287]:
avgScored_df = data_assessments.join(data_studentAssessment, on=["id_assessment"]) \
        .groupBy("code_module").agg(F.avg("score").alias("avg_score")).select("code_module", "avg_score")

In [288]:
avgScored_df.show()

+-----------+-----------------+
|code_module|        avg_score|
+-----------+-----------------+
|        CCC| 73.2613978551429|
|        BBB| 76.7063682263431|
|        DDD|70.09079993509654|
|        FFF|77.70759006007047|
|        EEE|81.18006593963987|
|        AAA|69.03051493960585|
|        GGG|79.70049293460401|
+-----------+-----------------+



In [289]:
# sql syntax

In [290]:
data_assessments.createOrReplaceTempView("data_assessments")

In [131]:
res_df = spark.sql("""
    SELECT code_module, AVG(score) as avg_score
    FROM data_assessments d_a
    JOIN data_studentAssessment d_sa
    ON d_a.id_assessment = d_sa.id_assessment
    GROUP BY code_module
    """)

In [132]:
res_df.show()



+-----------+-----------------+
|code_module|        avg_score|
+-----------+-----------------+
|        CCC| 73.2613978551429|
|        BBB| 76.7063682263431|
|        DDD|70.09079993509654|
|        FFF|77.70759006007047|
|        EEE|81.18006593963987|
|        AAA|69.03051493960585|
|        GGG|79.70049293460401|
+-----------+-----------------+



                                                                                

In [133]:
# check if two dfs are equal
avgScored_df.collect() == res_df.collect()

                                                                                

True

## 2) Find the students who scored the highest in any assessment.

In [7]:
from pyspark.sql.window import Window

In [8]:
# spark syntax - using rank window function

In [9]:
windowPartition = Window.partitionBy("id_assessment").orderBy(F.desc("score"))

In [10]:
best_students = data_studentAssessment.withColumn("score_rank", F.rank().over(windowPartition))\
        .filter("score_rank=1")\
        .select("id_student").distinct()

In [12]:
best_students.count()

                                                                                

7764

In [13]:
# sql syntax - using cte to find max results

In [15]:
res_df = spark.sql("""
    WITH max_scores AS (
        SELECT MAX(score) as max_score, id_assessment
        FROM data_studentAssessment
        GROUP BY id_assessment
    )
    SELECT 
    DISTINCT(id_student)
    FROM data_studentAssessment d_sa
    WHERE score IN ( SELECT 
                        max_score 
                    FROM max_scores 
                    WHERE 
                        d_sa.id_assessment = max_scores.id_assessment    
                        );
""")

In [16]:
res_df.count()

                                                                                

7764

## 3) Calculate the overall average score of students who passed.

In [291]:
# spark syntax

In [292]:
data_studentInfo.select("final_result").distinct().show()

+------------+
|final_result|
+------------+
|        Fail|
| Distinction|
|   Withdrawn|
|        Pass|
+------------+



In [49]:
data_Assesment_ext = data_studentAssessment.join(data_assessments, on=["id_assessment"])

In [50]:
avg_score = data_studentInfo.join(data_Assesment_ext, on=["id_student", "code_module", "code_presentation"])\
        .filter((F.col("final_result").isin("Pass", "Distinction"))).agg(F.avg("score").alias("avg_passed_score"))

In [51]:
avg_score.collect()

                                                                                

[Row(avg_passed_score=79.1411761149361)]

In [296]:
# sql syntax

In [54]:
res_df = spark.sql("""
    SELECT AVG(d_sa.score) as avg_passed_score
    FROM data_studentAssessment d_sa
    JOIN data_assessments d_a
        ON d_sa.id_assessment = d_a.id_assessment
    JOIN data_studentInfo d_si
        ON d_si.id_student = d_sa.id_student
        AND d_si.code_module = d_a.code_module
        AND d_si.code_presentation = d_a.code_presentation
    WHERE d_si.final_result IN ('Pass', 'Distinction');
""")


In [55]:
res_df.collect()

[Row(avg_passed_score=79.1411761149361)]

## 4) Find courses where more than 50% of students passed.

In [31]:
# pyspark syntax

In [30]:
data_studentInfo.groupBy("code_module", "code_presentation")\
    .agg((F.count(F.when(F.col("final_result") == 'Pass',1)) / F.count("id_student") * 100).alias("pass_percent"))\
    .filter("pass_percent > 50.0").show()

+-----------+-----------------+-----------------+
|code_module|code_presentation|     pass_percent|
+-----------+-----------------+-----------------+
|        AAA|            2014J|62.73972602739726|
|        AAA|            2013J|67.36292428198433|
+-----------+-----------------+-----------------+



In [33]:
# sql syntax

In [45]:
res_df = spark.sql("""
        SELECT code_module, code_presentation, COUNT(CASE WHEN final_result = 'Pass' THEN 1 ELSE NULL END)/COUNT(id_student)*100.0 as pass_percent
        FROM data_studentInfo
        GROUP BY code_module, code_presentation
        HAVING COUNT(CASE WHEN final_result = 'Pass' THEN 1 ELSE NULL END)/COUNT(id_student)*100.0 > 50.0
        """)

In [46]:
res_df.show()

+-----------+-----------------+-----------------+
|code_module|code_presentation|     pass_percent|
+-----------+-----------------+-----------------+
|        AAA|            2014J|62.73972602739726|
|        AAA|            2013J|67.36292428198433|
+-----------+-----------------+-----------------+



## 5) Find students with a final result of ‘Distinction’ but never submitted an assessment.

In [320]:
# pyspark syntax
data_Assesment_ext = data_studentAssessment.join(data_assessments, on=["id_assessment"])

In [321]:

fav_students = data_studentInfo.join(data_Assesment_ext, on=["id_student", "code_module", "code_presentation"], how="leftanti")\
            .filter(data_studentInfo["final_result"] == "Distinction").distinct()

In [322]:
fav_students.show()



+----------+-----------+-----------------+------+------+-----------------+--------+--------+--------------------+---------------+----------+------------+
|id_student|code_module|code_presentation|gender|region|highest_education|imd_band|age_band|num_of_prev_attempts|studied_credits|disability|final_result|
+----------+-----------+-----------------+------+------+-----------------+--------+--------+--------------------+---------------+----------+------------+
+----------+-----------+-----------------+------+------+-----------------+--------+--------+--------------------+---------------+----------+------------+



                                                                                

In [302]:
# sql syntax

In [157]:
res_df = spark.sql("""
    WITH assessments_ext AS (
        SELECT id_student FROM
        data_assessments d_a
        JOIN data_studentAssessment d_sa
        USING (id_assessment)
    )
    SELECT DISTINCT(id_student) 
    FROM data_studentInfo
    LEFT ANTI JOIN assessments_ext
    USING (id_student)
    WHERE final_result = 'Distinction'
""")

In [158]:
res_df.count()

                                                                                

0

In [317]:
res_df = spark.sql("""
    SELECT DISTINCT(id_student)
    FROM data_studentInfo d_si
    WHERE final_result = 'Distinction'
    AND NOT EXISTS (
        SELECT 1 FROM
        data_assessments d_a
        JOIN data_studentAssessment d_sa
        USING (id_assessment)
        WHERE d_sa.id_student = d_si.id_student
    )
""")

In [318]:
res_df.show()



+----------+
|id_student|
+----------+
+----------+



                                                                                

## 6) Find assessments where more than 80% of students scored below 50.

In [57]:
# spark syntax

In [75]:
hard_assessments = data_studentAssessment.groupBy("id_assessment").agg((F.sum(F.when(F.col("score") < 50, 1))/F.count("id_student") * 100.0).alias("below_p"))\
    .filter(F.col("below_p") > 80).select("id_assessment")

In [76]:
hard_assessments.count()

0

In [74]:
# sql syntax

In [79]:
res_df = spark.sql(
    """
    SELECT id_assessment
    FROM data_studentAssessment
    GROUP BY id_assessment
    HAVING SUM(CASE WHEN score < 50 THEN 1 END)/COUNT(*) * 100.0 > 80.0
    """
)

In [81]:
res_df.count()

                                                                                

0

## 7) Find students who submitted the same assessment multiple times.

In [327]:
# pyspark syntax

In [351]:
mutiple_tries_df = data_studentInfo.join(data_studentAssessment, on=["id_student"])\
        .groupBy("id_student", "id_assessment").agg(F.count("id_assessment").alias("n_submissions"))\
        .filter(F.col("n_submissions") > 1)

In [352]:
mutiple_tries_df.select("id_student").distinct().count()

                                                                                

3136

In [353]:
# sql syntax

In [361]:
res_df = spark.sql("""
        SELECT DISTINCT(d_si.id_student)
        FROM data_studentInfo d_si
        JOIN data_studentAssessment d_sa
        ON d_si.id_student = d_sa.id_student
        GROUP BY d_si.id_student, id_assessment
        HAVING COUNT(id_assessment) > 1
        """)

In [362]:
res_df.count()

3136

## 8) Find courses with the highest dropout rate.

In [97]:
# spark syntax

In [89]:
data_studentInfo.select("final_result").distinct().show()

+------------+
|final_result|
+------------+
|        Fail|
| Distinction|
|   Withdrawn|
|        Pass|
+------------+



In [91]:
droupout_df = data_studentInfo.groupBy("code_module", "code_presentation")\
        .agg((F.sum(F.when(F.col("final_result") == 'Withdrawn', 1)) / F.count("id_student") ).alias("dropout_rate"))

In [93]:
droupout_df.orderBy(F.desc("dropout_rate")).show(5)

[Stage 149:>                                                        (0 + 1) / 1]

+-----------+-----------------+-------------------+
|code_module|code_presentation|       dropout_rate|
+-----------+-----------------+-------------------+
|        CCC|            2014B|0.46384297520661155|
|        CCC|            2014J| 0.4311449159327462|
|        DDD|            2014B| 0.3990228013029316|
|        FFF|            2014J|0.36152219873150104|
|        DDD|            2014J|0.35884636716583473|
+-----------+-----------------+-------------------+
only showing top 5 rows



                                                                                

In [98]:
# sql syntax

In [96]:
res_df = spark.sql(
    """
    SELECT 
        code_module, code_presentation,
        SUM(CASE WHEN final_result = 'Withdrawn' THEN 1 END)/COUNT(id_student) AS dropout_rate
    FROM data_studentInfo 
    GROUP BY code_module, code_presentation
    ORDER BY dropout_rate DESC
    LIMIT 5
    """)

In [99]:
res_df.show()

+-----------+-----------------+-------------------+
|code_module|code_presentation|       dropout_rate|
+-----------+-----------------+-------------------+
|        CCC|            2014B|0.46384297520661155|
|        CCC|            2014J| 0.4311449159327462|
|        DDD|            2014B| 0.3990228013029316|
|        FFF|            2014J|0.36152219873150104|
|        DDD|            2014J|0.35884636716583473|
+-----------+-----------------+-------------------+



## 9) Calculate the number of interactions (clicks) per student.

In [368]:
total_clicks = data_studentVle.groupBy("id_student").agg(F.sum("sum_click").alias("total_clicks"))\
        .select("id_student", "total_clicks")

In [371]:
total_clicks.orderBy("total_clicks", ascending=False).show()



+----------+------------+
|id_student|total_clicks|
+----------+------------+
|     80868|       28615|
|    630905|       28264|
|    537811|       25159|
|    619927|       24368|
|    434476|       21204|
|    517269|       21123|
|    611417|       20391|
|    622374|       19928|
|    368315|       19734|
|    607900|       19673|
|    583487|       19461|
|    644361|       19415|
|    499813|       19199|
|   2088765|       19196|
|    898594|       19126|
|    560431|       18039|
|   2063578|       17957|
|    298040|       17503|
|    609229|       17482|
|    617580|       17481|
+----------+------------+
only showing top 20 rows



                                                                                

In [372]:
# sql syntax

In [374]:
data_studentVle.createOrReplaceTempView("data_studentVle")

In [379]:
res_df = spark.sql("""
    SELECT id_student, SUM(sum_click) as total_clicks
    FROM data_studentVle
    GROUP BY id_student
    ORDER BY total_clicks ASC
    """)

In [380]:
res_df.show()



+----------+------------+
|id_student|total_clicks|
+----------+------------+
|    102849|           1|
|    500031|           1|
|    689107|           1|
|    623319|           1|
|    628053|           1|
|    516447|           1|
|    485782|           1|
|    654888|           1|
|    646455|           1|
|    693934|           1|
|    521666|           1|
|    592124|           1|
|    639232|           1|
|    651831|           1|
|    594208|           1|
|    685841|           1|
|    611852|           1|
|    595877|           1|
|    552640|           1|
|    554766|           1|
+----------+------------+
only showing top 20 rows



                                                                                

## 10) Find the median score per module.

In [106]:
# pyspark syntax

In [114]:
median_module_scores = data_studentAssessment.join(data_assessments, on=["id_assessment"])\
    .groupBy("code_module")\
    .agg(F.median("score").alias("med_score"))

In [116]:
median_module_scores.orderBy(F.desc("med_score")).show()



+-----------+---------+
|code_module|med_score|
+-----------+---------+
|        EEE|     85.0|
|        BBB|     80.0|
|        FFF|     80.0|
|        GGG|     80.0|
|        CCC|     78.0|
|        DDD|     74.0|
|        AAA|     71.0|
+-----------+---------+



                                                                                

In [107]:
# sql syntax

In [117]:
res_df = spark.sql(
    """
    SELECT code_module, MEDIAN(score) as med_score
    FROM data_studentAssessment
    JOIN data_assessments
    USING (id_assessment)
    GROUP BY code_module
    ORDER BY med_score DESC
    """)

In [118]:
res_df.show()



+-----------+---------+
|code_module|med_score|
+-----------+---------+
|        EEE|     85.0|
|        BBB|     80.0|
|        FFF|     80.0|
|        GGG|     80.0|
|        CCC|     78.0|
|        DDD|     74.0|
|        AAA|     71.0|
+-----------+---------+



                                                                                

## 11) Calculate submission delay patterns.

In [136]:
# both date_submitted in data_studentAssessment and 
# date in data_assessments are relative: the number of days 
#                                            since the start of the 
#                                            module-presentation

In [142]:
delay_submissions = data_studentAssessment.join(data_assessments, on=["id_assessment"])\
    .withColumn("delay_days", F.col("date") - F.col("date_submitted"))\
    .filter(F.col("delay_days") < 0)

In [150]:
# average delay days per code_module
delay_submissions.groupBy("code_module").agg(F.mean("delay_days").alias("avg_delay_delays")).show()



+-----------+-------------------+
|code_module|   avg_delay_delays|
+-----------+-------------------+
|        CCC|-37.929292157162514|
|        BBB|  -4.67028613352898|
|        DDD|-28.424370636767506|
|        FFF| -9.091562883704613|
|        EEE| -8.336221837088388|
|        AAA|-7.8049079754601225|
|        GGG| -8.507832898172325|
+-----------+-------------------+



                                                                                

## 12) Compare performance across age bands.

In [119]:
# find avergage scores per each age group

In [133]:
data_studentInfo.join(data_studentAssessment, on="id_student")\
    .groupBy("age_band")\
    .agg(F.mean("score").alias("avg_score"), 
         F.median("score").alias("med_score"),
         F.stddev("score").alias("stddev_score")).show()



+--------+-----------------+---------+------------------+
|age_band|        avg_score|med_score|      stddev_score|
+--------+-----------------+---------+------------------+
|    0-35|74.55190257051278|     78.0| 19.11424095514945|
|    55<=|79.57217165149545|     83.0| 17.35466631878093|
|   35-55|77.25301108206502|     80.0|18.901193297000425|
+--------+-----------------+---------+------------------+



                                                                                

In [124]:
# sql syntax

In [134]:
res_df = spark.sql(
    """
    SELECT age_band, AVG(score)
    FROM data_studentInfo 
    JOIN data_studentAssessment
    USING (id_student)
    GROUP BY age_band
    """
)

In [127]:
res_df.show()



+--------+-----------------+
|age_band|       avg(score)|
+--------+-----------------+
|    0-35|74.55190257051278|
|    55<=|79.57217165149545|
|   35-55|77.25301108206502|
+--------+-----------------+



                                                                                

# List differences between pyspark sql and sl syntax