In [94]:
import os.path
import random

from pyspark import Row, SparkConf, SparkContext
from pyspark.sql import SparkSession

import re
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.sql.functions import udf, regexp_replace, split, col, desc, collect_list, substring, avg, collect_set, length
from pyspark.sql.types import StringType, IntegerType, FloatType, ArrayType
from pyspark.sql.functions import concat_ws
import pickle
import pandas as pd
from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import size, array_intersect, when

# PySPark's Word2Vec for a fixed size vectors
from sklearn.feature_extraction.text import CountVectorizer


In [3]:
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark

In [4]:
spark = init_spark()
# spark.stop()
# spark = SparkSession.builder.getOrCreate()

In [5]:
# filename = "CU_SR_OPEN_DATA_CATALOG.csv"
# df = spark.read.csv(filename, header=True, mode="DROPMALFORMED")
# df = df.drop('index',"Course ID")
# df = df.na.drop()
# df = df.withColumn("Subject_and_Catalog", concat_ws(" ", "Subject", "Catalog" ))
# df = df.withColumn('concatenated_columns',
#                    concat_ws(" ", "Subject", "Catalog", "Career"))

In [40]:
filename = "CATALOG.csv"
description_df = spark.read.csv(filename, header=True, mode="DROPMALFORMED")
description_df = description_df.drop('index',"Course ID")
description_df = description_df.na.drop()
description_df = description_df.drop("Website", "Key")
description_df = description_df.withColumn("Subject_and_Catalog_Degree", concat_ws(" ", "Course code","Course number","Degree"))
description_df = description_df.filter(length("Course code") > 1)
# description_df = description_df.select("Subject_and_Catalog", "Description")

In [53]:
description_df.printSchema()

root
 |-- Faculty: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Program: string (nullable = true)
 |-- Level: string (nullable = true)
 |-- Degree: string (nullable = true)
 |-- Course code: string (nullable = true)
 |-- Course number: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Metadata: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Subject_and_Catalog_Degree: string (nullable = false)



In [29]:
# joined_df = description_df.join(df, df.Subject_and_Catalog == description_df.Subject_and_Catalog, how="full")

In [None]:
description_df.show(20)

In [9]:
# from pyspark.ml.feature import StopWordsRemover
# remover = StopWordsRemover(inputCol="Description", outputCol="filtered")
# remover.transform(df).show(truncate=False)

In [57]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer



tokenizer = Tokenizer(inputCol="Description", outputCol="CourseWords")

wordsData = tokenizer.transform(description_df)

hashingTF = HashingTF(inputCol="CourseWords", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

features = rescaledData.map("features").toArray
results = rescaledData

In [86]:
results.show(10)

+-------+--------------------+--------------------+-------------+----------+-----------+-------------+--------------------+--------------------+-----------+-----------------+--------------------------+--------------------+--------------------+--------------------+
|Faculty|          Department|             Program|        Level|    Degree|Course code|Course number|               Title|         Description|   Metadata|             Type|Subject_and_Catalog_Degree|         CourseWords|         rawFeatures|            features|
+-------+--------------------+--------------------+-------------+----------+-----------+-------------+--------------------+--------------------+-----------+-----------------+--------------------------+--------------------+--------------------+--------------------+
|    FAS|             History|             History|    Any level|Any degree|       HIST|          498|Critical Museolog...|International Gra...|    Courses|Course-integrated|       HIST 498 Any degree|[int

In [101]:
# extract_integers_udf = udf(lambda x: [int(i) for i in x.toArray()], ArrayType(IntegerType()))
extract_indices_udf = udf(lambda x: x.indices.tolist())



In [103]:
df = results.withColumn("values", extract_indices_udf(results["features"]))


In [104]:
df.select("features","values").show(10, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+
|features                                                                                                                                                                                                                                                                                                                                                                                                 |values                                                         |
+-------------------------------------------------------------------------------

In [55]:
results.printSchema()

root
 |-- Faculty: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Program: string (nullable = true)
 |-- Level: string (nullable = true)
 |-- Degree: string (nullable = true)
 |-- Course code: string (nullable = true)
 |-- Course number: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Metadata: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Subject_and_Catalog_Degree: string (nullable = false)
 |-- CourseWords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawFeatures: vector (nullable = true)
 |-- features: vector (nullable = true)



In [None]:
def cosine_similarity(vec1, vec2):
    return float(vec1.dot(vec2) / (vec1.norm(2) * vec2.norm(2)))

In [None]:
# Register the cosine similarity function as a UDF
cosine_similarity_udf = udf(cosine_similarity, DoubleType())

# CrossJoin the DataFrame with itself to get all pairs of rows
df_pairs = result.alias("a").crossJoin(result.alias("b"))

# Register the UDF
spark.udf.register("cosine_similarity_udf", cosine_similarity, DoubleType())

# Calculate the cosine similarity for each pair of rows
df_similarity = df_pairs.selectExpr(
    "a.code as code1",
    "b.code as code2",
    "cosine_similarity_udf(a.featuresCountVec, b.featuresCountVec) as similarity"
)

# It won't display the CosineSimilarity for identical course codes
# And won't display redundant rows (ex: COMP352 | COMP346 | 0.4
#                                       COMP346 | COMP352 | 0.4 )
df_similarity = df_similarity.filter(col("code1") < col("code2"))
#df_similarity.printSchema()
# df_similarity.show(50)

#can set the minimum threshhold for course similarity.
df_similarity = df_similarity.filter(col("similarity") > 0)
df_similarity = df_similarity.orderBy('similarity', ascending=False)