In [1]:
import os.path
import random

from pyspark import Row, SparkConf, SparkContext
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession

# from src.core import init_spark
from src.preprocessing.loader import load
import re
from pyspark.sql.functions import udf, regexp_replace, split, col, desc, collect_list, substring, avg, collect_set
from pyspark.sql.types import StringType, DoubleType, FloatType
from pyspark.sql.functions import concat_ws
import pickle
import pandas as pd
from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import size, array_intersect, when

# PySPark's Word2Vec for a fixed size vectors
from pyspark.ml.feature import CountVectorizer


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/11 13:02:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


<h1> Setup

Code used to facilitate data preprocessing and more.

In [2]:
pattern = re.compile(r'http:\/\/dbpedia\.org\/resource\/')


def replace_pattern(text):
    return [pattern.sub(r' ', t) for t in text]

In [3]:
def init_spark(app_name: str, executor_memory: str, executor_cores: int, driver_memory: str,
               max_task_retries: int, max_failures: int):

    conf = SparkConf()
    conf.setAppName(app_name)
    conf.set('spark.executor.memory', executor_memory)
    conf.set('spark.executor.cores', executor_cores)
    conf.set('spark.driver.memory', driver_memory)
    conf.set('spark.task.maxFailures', max_failures)
    conf.set('spark.task.maxTaskAttempts', max_task_retries)

    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)

    return spark

<h1> Initialization of Spark Application

In [4]:
#stops any existing Spark running to allow
spark = SparkSession.builder.appName("my_app1").getOrCreate()
spark.stop()

23/04/11 13:02:08 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [5]:
df = load()
spark = init_spark('my_app1', '4g', 6, '4g', 10, 20)
df = spark.createDataFrame(df)

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


<h1> Data Preprocessing (Content-Based)

Uses Count Vectorizer to create vectors that will be evaluated using a cosine similarity
https://spark.apache.org/docs/latest/ml-features.html#countvectorizer
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.CountVectorizer.html

In [6]:

# df.printSchema()
# print(df.count())
udf_replace_pattern = udf(replace_pattern, StringType())

df = df.withColumn('concepts_isolated', udf_replace_pattern('concepts'))
df = df.withColumn("concepts_isolated_no_brackets", regexp_replace("concepts_isolated", r"\[|\]", ""))

df = df.withColumn('concatenated_columns',
                   concat_ws(" ", "subject", "catalog", "career", "credit", "concepts_isolated_no_brackets"))
df = df.withColumn("array_features", split("concatenated_columns", ",")  )# puts the concepts into an array for Word2Vec
# df.select("array_features").show(10, truncate=False)

cv = CountVectorizer(inputCol="array_features", outputCol="featuresCountVec")
cv.binary = True
cv.setMinDF = 2
cv.setMinTF = 1
model = cv.fit(df)
result = model.transform(df)
# result.select("featuresCountVec").show(10, truncate=False)

# dropping everything except the course code and the countvectorizer
result = result.drop('ID', 'title', 'subject', 'catalog', 'career', 'credit', 'requisites', 'description', 'concepts', 'concepts_isolated', 'concepts_isolated_no_brackets', 'concatenated_columns', 'array_features')
# result.show(10, truncate=False)

# useful to see the overall columns and metadata
result.printSchema()
# (training, test) = result.randomSplit

                                                                                

root
 |-- code: string (nullable = true)
 |-- featuresCountVec: vector (nullable = true)



<h1> Content Based Recommender (Cosine Similarity)

In [7]:
def cosine_similarity(vec1, vec2):
    return float(vec1.dot(vec2) / (vec1.norm(2) * vec2.norm(2)))

In [8]:
# Register the cosine similarity function as a UDF
cosine_similarity_udf = udf(cosine_similarity, DoubleType())

# CrossJoin the DataFrame with itself to get all pairs of rows
df_pairs = result.alias("a").crossJoin(result.alias("b"))

# Register the UDF
spark.udf.register("cosine_similarity_udf", cosine_similarity, DoubleType())

# Calculate the cosine similarity for each pair of rows
df_similarity = df_pairs.selectExpr(
    "a.code as code1",
    "b.code as code2",
    "cosine_similarity_udf(a.featuresCountVec, b.featuresCountVec) as similarity"
)

# It won't display the CosineSimilarity for identical course codes
# And won't display redundant rows (ex: COMP352 | COMP346 | 0.4
#                                       COMP346 | COMP352 | 0.4 )
df_similarity = df_similarity.filter(col("code1") < col("code2"))
#df_similarity.printSchema()
# df_similarity.show(50)

#can set the minimum threshhold for course similarity.
df_similarity = df_similarity.filter(col("similarity") > 0)
df_similarity = df_similarity.orderBy('similarity', ascending=False)
# df_similarity.show(50)


In [9]:
# df_similarity.show(200)

<h1> Loads Pickled Content Based Model's Results

In [10]:
try:
    with open('similarity.pkl', 'rb') as file:
        pandas = pickle.load(file)
        pickleDF = spark.createDataFrame(pandas)
        pickleDF = pickleDF.orderBy("similarity", ascending=True)
        # pickleDF.show(10000)
        # print(pickleDF.count())
except:
    with open('similarity.csv', 'rb') as file:
        pandas = pd.read_csv(file)
        pickleDF = spark.createDataFrame(pandas)
        pickleDF.show()
        # print(pickleDF.count())


<h1>Evaluation (Content Based)

Using the initial dataset that uses "ID",'title','subject', 'catalog', 'career', 'credit', 'description','concepts', 'code, 'requisites' to create our truth values based on the idea that requisite courses that are assigned by an institution for a course should be relevant/recommended.
Source: https://bond-kirill-alexandrovich.medium.com/precision-and-recall-in-recommender-systems-and-some-metrics-stuff-ca2ad385c5f8

    
A "Precision" column representing the Precision metric:
    $$ \frac{RecommendedCourses \cap requisites}{RecommendedCourses}$$
    
A "Recall" column representing the Recall metric:
    $$ \frac{RecommendedCourses \cap requisites}{requisites}$$
    
A "F1-score" column representing the F1-score metric:
    $$ \frac{(1+\beta^2) * Precision * Recall}{\beta^2 Precision + Recall}$$

$\beta = 1$: Precision and Recall have equal importance
$\beta > 1$: Recall has more importance
$\beta < 1$: Precision has more importance

In [11]:
# group by code1 of similarity then for each code1 compare how many of the pre-reqs are in that list
recommender = pickleDF.groupBy("code1").agg(collect_list(col("code2")).alias("RecommendedCourses"))
#renamed code1 to Course since that is the origin point that someone would see
try:
    recommender = recommender.withColumnRenamed("code1", "Course")
except AnalysisException as e:
    print(e)

#uses the initial dataset that uses "ID",'title','subject', 'catalog', 'career', 'credit', 'description','concepts', 'code, 'requisites' to create our truth values
wholeDF = load()
truthDF = spark.createDataFrame(wholeDF)
truthDF = truthDF.drop("ID",'title','subject', 'catalog', 'career', 'credit', 'description','concepts')

#merging our truth values with the predictions
merged = truthDF.join(recommender, recommender.Course==truthDF.code)
merged = merged.drop("Course")
merged.show(10)

'''
Sources:
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.mllib.evaluation.RankingMetrics.html
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems
'''

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


+-------+--------------------+--------------------+
|   code|          requisites|  RecommendedCourses|
+-------+--------------------+--------------------+
|ACCO220|[ACCO218, ACCO213...|[CENT1100, CESF20...|
|ACCO230|[ACCO365, COMM305...|[FINA408, FINA402...|
|ACCO240|[COMM305, ACCO218...|[CENT1100, CESF20...|
|ACCO310|[COMM305, ACCO218...|[CHEM242, COMP478...|
|ACCO320|[ACCO400, ACCO460...|[ECON643, GPWL939...|
|ACCO330|[COMM305, ACCO218...|[BIOL322, COMP476...|
|ACCO340|[ACCO440, ACCO213...|[GPLL258, GPWL939...|
|ACCO350|[COMM305, ACCO218...|[COMP445, ENCS681...|
|ACCO355|                  []|[IRST404, BIOL452...|
|ACCO360|  [COMM305, ACCO465]|[ACCO455, CEPS109...|
+-------+--------------------+--------------------+
only showing top 10 rows



'\nSources:\nhttps://spark.apache.org/docs/latest/api/python/reference/api/pyspark.mllib.evaluation.RankingMetrics.html\nhttps://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems\n'

In [12]:
def calculate_evaluation_metrics(df, betaValue):
    # Calculate the size of the intersection between RecommendedCourses and requisites
    intersection_size = size(array_intersect(col("RecommendedCourses"), col("requisites")))
    beta = betaValue
    # Calculate precision, recall, and F1-score
    precision = when(col("RecommendedCourses").isNotNull(), intersection_size/size(col("RecommendedCourses"))).otherwise(None)
    recall = when(col("requisites").isNotNull(), intersection_size/size(col("requisites"))).otherwise(None)
    f1_score = when(precision.isNotNull() & recall.isNotNull(), ((1+beta*beta)*precision*recall)/(beta*beta*precision+recall)).otherwise(None)

    # Add the new columns
    df = df.withColumn("Precision", precision)
    df = df.withColumn("Recall", recall)
    df = df.withColumn("F1-score", f1_score)
    df = df.orderBy(desc("F1-score"))
    return df

<h1> Evalaution of Content Based

In [14]:
with open('similarity.pkl', 'rb') as file:
    pandas = pickle.load(file)
    pickleDF = spark.createDataFrame(pandas)
    rdd = spark.read.text("content_based_student_courses").rdd
    studentCourses = []
    for l in rdd.collect():
        studentCourses.append(l[0])
    allRequisitesList = []
    truthDfAllCourseRequisites = truthDF.filter(col("code").isin(studentCourses)).drop("code")
    
    for courseRequisitesRow in truthDfAllCourseRequisites.collect():
        for courseRequisites in courseRequisitesRow[0]:
            allRequisitesList.append(courseRequisites)

    print("Prerequisite courses: " + str(allRequisitesList))


    filteredDf = pickleDF.filter(col("code1").isin(studentCourses)).drop("code1", "similarity").orderBy(col('similarity'), ascending=False).limit(10).withColumnRenamed("code2", "similar_courses")
    recommendedCourseList = []
    for recommendedCourseRow in filteredDf.collect():
        for recommendedCourse in recommendedCourseRow:
            recommendedCourseList.append(recommendedCourse)

    print("Recommended courses: " + str(recommendedCourseList))

Prerequisite courses: ['ACCO218', 'ACCO213', 'ACCO240', 'ACCO230', 'ACCO365', 'COMM305', 'COMM308', 'ACCO340', 'ACCO345', 'ACCO213', 'IBUS471', 'ACCO240', 'ACCO470', 'ACCO455', 'COMM217', 'ACCO220', 'COMM305', 'ACCO218', 'ACCO330', 'ACCO310', 'MARK462', 'IBUS471', 'ACCO350', 'ACCO470', 'ACCO355', 'IBUS462', 'ACCO220', 'COMM305', 'ACCO218', 'ACCO320', 'ACCO240', 'ACCO350', 'ACCO400', 'ACCO460', 'ACCO310', 'ACCO410', 'ACCO450', 'ACCO323', 'ACCO326', 'ACCO420', 'ACCO440', 'ACCO213', 'ACCO441', 'ACCO230', 'COMM217', 'COMM305', 'ACCO218', 'COMM301', 'ACCO414', 'COMM226', 'ACCO240', 'COMM217', 'ACCO230', 'ACCO461', 'ACCO320', 'ACCO326', 'ACCO410', 'ACCO320', 'ACCO326', 'ACCO330', 'ACCO303', 'ACCO435', 'ACCO422', 'ACCO330', 'ACCO320', 'COMM401', 'ACCO490', 'ACCO442', 'ACCO441', 'ACCO340', 'COMM315', 'COMM217', 'ACCO230', 'ACCO320', 'ACCO470', 'ACCO360', 'ACCO643', 'ACCO678', 'ACCO613', 'MBA642', 'MBA607', 'MBA642', 'CART211', 'CART251', 'DFAR251', 'DFAR351', 'DFAR350', 'DANC305', 'DANC410', '

In [15]:
comparisonDf = pd.DataFrame([[allRequisitesList,recommendedCourseList]],columns=['requisites','RecommendedCourses'])

comparisonDf = spark.createDataFrame(comparisonDf)
calculate_evaluation_metrics(comparisonDf, 1.5).select("F1-score").agg(avg('F1-score')).show(20)
calculate_evaluation_metrics(comparisonDf, 1.0).select("F1-score").agg(avg('F1-score')).show(20)
calculate_evaluation_metrics(comparisonDf, 0.5).select("F1-score").agg(avg('F1-score')).show(20)

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


+--------------------+
|       avg(F1-score)|
+--------------------+
|0.023787740164684358|
+--------------------+

+-------------------+
|      avg(F1-score)|
+-------------------+
|0.03149606299212599|
+-------------------+

+-------------------+
|      avg(F1-score)|
+-------------------+
|0.06369426751592357|
+-------------------+



<h1> Metrics

<h2> Precision:

This metric represents the proportion of relevant courses among the courses recommended by the model.
A high precision means that the model is recommending mostly relevant courses, while a low precision means that the model is recommending many irrelevant courses. In the context of this problem, precision tells you how often the courses recommended by the model are actually requisites for the target course. A precision of 1.0 means that all the courses recommended by the model are requisites for the target course, while a precision of 0.0 means that none of the courses recommended by the model are requisites for the target course.

<h2>Recall

This metric represents the proportion of relevant courses that are correctly identified by the model. A high recall
means that the model is able to identify most of the relevant courses, while a low recall means that the model is missing many relevant courses. In the context of this problem, recall tells you how often the model is able to identify the requisites for the target course. A recall of 1.0 means that the model is able to identify all the requisites for the target course, while a recall of 0.0 means that the model is unable to identify any of the requisites for the target course.

<h2>F1-score

A high F1-score means that the model is able to achieve high precision and high recall at the same time, while a low F1-score means that the model is struggling to achieve both precision and recall. In this situation, F1-score tells us how well the model is able to identify the requisites for the target course while minimizing the number of irrelevant courses recommended.

In [17]:
# print(f"Nb of partitions Cosine: {df_similarity.rdd.getNumPartitions()}")

<h1>Collaborative Filtering Model</h1>
Scale of ratings:
1-5 (lowest to highest)

<h2>How synthetic data was generated:</h2>

Since all students take on average 35 courses in their degree, that can be seen as around 0.005% of all courses. This was our first approach at generating all the synthetic data. However, after improving our algorithm, we decided to generate the data using a more realistic approach. We filtered out all different disciplines and generated 10 students for each. On average, for example, a software engineering student will take 60% of all software engineering classes available as there are a lot of electives that are optional. For courses in their program, the student rates the course between 3-5 stars since it is in their domain and they are more likely to like these courses. For courses not in their program, they have a 0.2% of rating the course and these ratings are between 1-5 stars since there is a chance they rate courses outside their degree low. Every student for every program iterate over all courses and apply these rules over all courses.

In [18]:
#First, we need to extract all unique class types
codeDf = df.select(substring(df.code, 1,4)).distinct().orderBy(df["code"])
courseList = []
for i in codeDf.collect():
    course = i[0]
    if any(char.isdigit() for char in course):
        course = course[0:3]
    courseList.append(course)
courseList = list(dict.fromkeys(courseList))

if not os.path.isfile("courses.txt"):
    with open("courses.txt", 'w') as f:
        f.write('\n'.join(courseList))

allCourses = df.select("code")
allCoursesList = [row[0] for row in allCourses.collect()]

if not os.path.isfile("allCourses.txt"):
    with open("allCourses.txt", "w") as f:
        for i in range(0, len(allCoursesList)):
            f.write(allCoursesList[i] + "::" + str(i) + "\n")

In [19]:
#Then, we need to generate synthetic data for our model to train on.
# spark = init_spark()
rddCourses = spark.read.text("courses.txt").rdd
rddAllCourses = spark.read.text("allCourses.txt").rdd
splitRdd = rddAllCourses.map(lambda x: x.value.split("::"))

studentsPerProgram = 10
if not os.path.isfile("syntheticData.txt"):
    with open("syntheticData.txt", "w") as f:
        times = 0
        for courseType in rddCourses.collect():
            for student in range(0,studentsPerProgram):
                for uniqueCourse in splitRdd.collect():
                    if re.search(courseType[0], uniqueCourse[0]):
                        rand = random.randint(1, 10)
                        if rand <= 6:
                            rating = random.uniform(3.0,5.0)
                            f.write(str(student + times) + "::" + str(uniqueCourse[1]) + "::" + str(uniqueCourse[0]) + "::" + str(rating) + "\n")
                    else:
                        rand = random.randint(1,500)
                        if rand == 1:
                            notInProgramRating = random.uniform(1.0, 5.0)
                            f.write(str(student + times) + "::" + str(uniqueCourse[1]) + "::" + str(uniqueCourse[0]) + "::" + str(notInProgramRating) + "\n")
            times = times + studentsPerProgram

In [28]:
from pyspark.sql.types import StructType, StructField, IntegerType
# spark = init_spark()
import time
start = time.time()
lines = spark.read.text("syntheticData.txt").rdd
lineArr = lines.map(lambda line: line.value.split("::"))

ratingsRdd = lineArr.map(
    lambda line: Row(studentId=int(line[0]), classId=int(line[1]), className=str(line[2]), rating=float(line[3])))

ratingsDF = spark.createDataFrame(ratingsRdd)

(training, test) = ratingsDF.randomSplit([0.8,0.2])

als = ALS(userCol="studentId", itemCol="classId", ratingCol="rating", coldStartStrategy="drop", rank=70, regParam=0.1)
model = als.fit(training)


predictions = model.transform(test)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

#Example for 1 user.
userId = 0

userRecs = model.recommendForUserSubset(ratingsDF.filter(col("studentId") == userId), 10)

print(f"Model Training Duration {time.time()-start} seconds")

userRecs.show(10)

recommendedCoursesCFList = []
for user in userRecs.collect():
    user_id = user[0]
    s = user_id
    for item in user[1]:
        item_name = training.filter(col("classId") == item[0]).collect()[0][2]
        recommendedCoursesCFList.append(item_name)

s = ""
with open("classRecommendationsCF.txt", "w") as f:
    for rec in userRecs.collect():
        user_id = rec[0]
        s = user_id
        for item in rec["recommendations"]:
            item_name = ratingsDF.filter(col("classId") == item[0]).collect()[0][2]
            f.write(str(s) + " | " + str(item_name) +  " | " + str(item[1]) + "\n")
        f.write("\n")

#The code below allows us to generate rating for users with ID > 2660.
# ratingsTestDF = ratingsDF.filter(col("studentId") >= 2660)
# filteredUserIdDF = ratingsTestDF.drop("classId", "className", "rating").distinct().orderBy(col("studentId"))

# userIdList = []
# for userIdRow in  filteredUserIdDF.collect():
#     userIdList.append(userIdRow[0])
# print(userIdList)

# # userIdDF = spark.createDataFrame(userRdd)
# userDf = spark.createDataFrame([(uid,) for uid in userIdList], ['studentId'])
#
# userRecs = model.recommendForUserSubset(userDf, 10)
# #
# userRecs.show(10)
#
#
# # users = ratingsDataFrame.select(als.getUserCol()).distinct().limit(5)
# # userRecs = model.recommendForUserSubset(users, 10)
# print(f"Model Training Duration {time.time()-start} seconds")
# # userRecs.show(5, truncate=False, vertical=True)
#
# userRecs = model.recommendForUserSubset(ratingsDataFrame.filter(col("studentId") == userId), 10)
# # recommendedCoursesCFList = []
# # for userRecommendationRow in userRecs.collect():
# #     recommendedCoursesCFList = []
# #     user_id = userRecommendationRow[0]
# #     s = user_id
# #     for item in userRecommendationRow[1]:
# #         item_name = training.filter(col("classId") == item[0]).collect()[0][2]
# #         recommendedCoursesCFList.append(item_name)
# #     print(str(recommendedCoursesCFList))
#
# # s = ""
# # with open("classRecommendationsCF.txt", "w") as f:
# #     for rec in userRecs.collect():
# #         user_id = rec[0]
# #         s = user_id
# #         for item in rec["recommendations"]:
# #             item_name = ratingsTestDF.filter(col("classId") == item[0]).collect()[0][2]
# #             f.write(str(s) + " | " + str(item_name) +  " | " + str(item[1]) + "\n")
# #         f.write("\n")

Model Training Duration 3.0755138397216797 seconds
+---------+--------------------+
|studentId|     recommendations|
+---------+--------------------+
|        0|[{5327, 4.480903}...|
+---------+--------------------+



In [29]:
comparisonCFDf = pd.DataFrame([[allRequisitesList, recommendedCoursesCFList]], columns=['requisites', 'RecommendedCourses'])

comparisonCFDf = spark.createDataFrame(comparisonCFDf)
calculate_evaluation_metrics(comparisonCFDf, 1.5).select("F1-score").agg(avg('F1-score')).show(20)
calculate_evaluation_metrics(comparisonCFDf, 1.0).select("F1-score").agg(avg('F1-score')).show(20)
calculate_evaluation_metrics(comparisonCFDf, 0.5).select("F1-score").agg(avg('F1-score')).show(20)

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


+--------------------+
|       avg(F1-score)|
+--------------------+
|0.035681610247026534|
+--------------------+

+--------------------+
|       avg(F1-score)|
+--------------------+
|0.047244094488188976|
+--------------------+

+-------------------+
|      avg(F1-score)|
+-------------------+
|0.09554140127388536|
+-------------------+



In [30]:
rmse = evaluator.evaluate(predictions)
print("RMSE: " + str(rmse))

RMSE: 1.0539415404712085


In [31]:
print(f"Nb of partitions Collaborative Filtering: {userRecs.rdd.getNumPartitions()}")

Nb of partitions Collaborative Filtering: 1
