# Advanced Database and Information Systems
## Project 1
### Sneha Ganganna (5579362), Aishwarya Dinni (5653414)

In [1]:
#To check if the spark has been installed 
#!pip install pyspark

In [2]:
import csv
import time
import pyspark
from pyspark import SparkConf
from pyspark.sql.types import *
from collections import Counter
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import countDistinct

In [3]:

# initialise Spark Session
sparkSession = SparkSession.builder.appName("exe1").getOrCreate()
sc = sparkSession.sparkContext


# Exercise 1. 2 (Loading the dataset into an RDD)

In [17]:


# Read users_libraries file into RDD
usersRatingsRDD = sc.textFile("users_libraries")
# Create Pair RDD for users_libraries Text File
usersRatingsRDD = sc.textFile("users_libraries.txt").map(lambda line: line.split(";")).map(lambda line: (line[0],list(map(int,line[1].split(",")))))


#Stopword Broadcast
stopWords = sc.textFile("stopwords_en.txt")
stopWordsBroadcast = sc.broadcast(stopWords.collect())


#PaperCsv RDD
def processingCsvFile(line):
    papers = csv.reader([line.replace("\0", "")], delimiter=',', quoting=csv.QUOTE_MINIMAL)
    paperList = next(papers)
    return paperList[0], paperList[14]

papersTermsRDD = sc.textFile("papers.csv")
papersTermsRDD = papersTermsRDD.map(processingCsvFile).filter(lambda x: (x[1] != "" and x[1] != " ")).map(lambda x: (int(x[0]),x[1].split(" ")))

# Join the two RDDs on the common key i.e. paper_id
papersTerms_userRatings = usersRatingsRDD.flatMapValues(lambda x: x).map(lambda x: (x[1],x[0])).join(papersTermsRDD)

# Print the result
papersTerms_userRatings.foreach(print)
papersTerms_userRatings.foreach(lambda x: print(x))


# Exercise 1. 3 (Joining collections)

In [16]:
start_time = time.time()

# Finding top 10 most frequent words
def FrequentWords(x):
    rList = Counter(x)
    FrequentWordsCount = CounterList.most_common(10)
    FrequentWords = [word for word, word_count in FrequentWordsCount]
    return FrequentWords

#to Remove the stopWords
def removeStopWords(List):
    requiredList = List.copy()
    for x in List:
        if ((x in stopWordsBroadcast.value) or x == "" or x == " "):
            requiredList.remove(x)
    return requiredList


# join the RDDs userLibraries and PapersRdd
usersLibraries_PapersRdd = usersRatingsRDD.flatMapValues(lambda x: x).map(lambda x: (x[1],x[0])).join(papersTermsRDD)
#restructure the RDD
usersLibraries_PapersRdd = usersLibraries_PapersRdd.map(lambda x: (x[1][0],x[1][1]))
#transformation of RDD to perform computations 
usersLibraries_PapersRdd   = usersLibraries_PapersRdd.flatMapValues(lambda x:x).groupByKey().mapValues(list)
#to remove the stop words from the RDD
#let Rdd be usersLibraries_PapersRdd
rddWithoutStopWords = usersLibraries_PapersRdd.mapValues(removeStopWords)

# now run the def frequentWords after the stop words are removed
frequentWordList = rddWithoutStopWords.mapValues(FrequentWords)

#wrtiting the above data into file
def CreateCsv(data):
    Data = data[0] + "," + (','.join(str(d) for d in data[1]))
    return Data

frequentWordListFile = frequentWordList.map(CreateCsv)
#frequentWordListFile.saveAsTextFile("result/Results")

end_time = time.time()

print("Execution Time taken to Join collections:", end_time - start_time, "in seconds")


Execution Time taken to Join collections: 0.03113842010498047 in seconds


# Exercise 1. 4 (Basic Analysis for Recommender Systems)

In [15]:
#to record execution time
start_time = time.time()

#a) Number of (distinct) user, number of (distinct) items, and number of ratings

#first apply flatMapVales() function for each kay-pair value in userRatingsRDD
usersRatingsRDDFMV =usersRatingsRDD.flatMapValues(lambda x:x)

numOfUsers = usersRatingsRDDFMV.keys().distinct().count()
print("Number of distinct users: ", numOfUsers)

numOfItems = usersRatingsRDDFMV.values().distinct().count()
print("Number of distinct items: ", numOfItems)

numOfRatings = usersRatingsRDDFMV.count()
print("Number of ratings: ", numOfRatings)

#b) Min number of ratings a user has given
minNumberOfRatings = usersRatingsRDD.map(lambda x: (x[0],len(x[1]))).map(lambda x: x[1]).min()
print("Minimum number of ratings a user has given: ", minNumberOfRatings)

#c)  Max number of ratings a user has given
maxNumberOfRatings = usersRatingsRDD.map(lambda x: (x[0],len(x[1]))).map(lambda x: x[1]).max()
print("Maximum number of ratings a user has given: ", maxNumberOfRatings)

#d)  Average number of ratings of users
avgNumberOfRatings = numOfRatings/numOfUsers
print("Average number of ratings a user has given: ", avgNumberOfRatings)

#e) Standard deviation for ratings of users
stdevRatingsOfUsers = (usersRatingsRDD.map(lambda x: (x[0],len(x[1]))).map(lambda x: x[1])).stdev()
print("Standard deviation of ratings for users",stdevRatingsOfUsers)
 
#f)Min number of ratings an item has received
minRatingsOfitem = usersRatingsRDDFMV.map(lambda x: (x[1],1)).reduceByKey(lambda x,y: x+y).map(lambda x: x[1]).min()
print("Minimum number of ratings an item has received: ", minRatingsOfitem)

#g) Max number of ratings an item has received
maxRatingsOfitem = usersRatingsRDDFMV.map(lambda x: (x[1],1)).reduceByKey(lambda x,y: x+y).map(lambda x: x[1]).max()
print("Maximum number of ratings an item has received: ", maxRatingsOfitem)

#h) Average number of ratings of items
avgRatingsOfitem= numOfRatings/numOfItems
print("Average number of ratings per item: ", avgRatingsOfitem)

#i) Standard deviation for ratings of items
stdevRatingsOfItems = usersRatingsRDDFMV.map(lambda x: (x[1],1)).reduceByKey(lambda x,y: x+y).map(lambda x: x[1]).stdev()
print("Standard deviation of ratings per item: ", stdevRatingsOfItems)

end_time = time.time()
print("The time taken for the Basic Analysis of Recommender Systems:", end_time-start_time,"seconds")


Number of distinct users:  28416
Number of distinct items:  172079
Number of ratings:  828481
Minimum number of ratings a user has given:  1
Maximum number of ratings a user has given:  1922
Average number of ratings a user has given:  29.155440596846848
Standard deviation of ratings for users 81.1751761366871
Minimum number of ratings an item has received:  3
Maximum number of ratings an item has received:  924
Average number of ratings per item:  4.81453867119172
Standard deviation of ratings per item:  5.477802292314525
The time taken for the Basic Analysis of Recommender Systems: 6.037190675735474 seconds


# Exercise 1. 5 (Loading the dataset into Data Frames)

In [7]:
# To define the schema of the CSV data
papersSchema = StructType([
    StructField("paper_id", IntegerType(), True),
    StructField("type", StringType(), True),
    StructField("journal", StringType(), True),
    StructField("bookـtitle", StringType(), True),
    StructField("series", IntegerType(), True),
    StructField("publisher", StringType(), True),
    StructField("pages", IntegerType(), True),
    StructField("volume", IntegerType(), True),
    StructField("number", IntegerType(), True),
    StructField("year", IntegerType(), True),
    StructField("month", StringType(), True),
    StructField("postedat", FloatType(), True),
    StructField("address", StringType(), True),
    StructField("title", StringType(), True),
    StructField("abstract", StringType(), True)
    ])
papersCsvDf = sparkSession.read.csv("papers.csv", sep = ",", header = True, schema = papersSchema)
papersCsvDf = papersCsvDf.na.drop(subset=["abstract"]) # drops the null values



# To define the schema of the text data
userLibrariesSchema = StructType([
    StructField("user_hash_id",StringType(),True),
    StructField("user_library",StringType(),True)
])
usersLibrariesDf = sparkSession.read.csv("users_libraries.txt", sep = ";", header = True, schema = userLibrariesSchema)
# Select the columns using selectExpr() function
usersLibrariesDf = usersLibrariesDf.selectExpr("user_hash_id","split(user_library,',') AS user_library")

usersLibrariesDf.head()

papersCsvDf.head()

Row(paper_id=5842862, type='article', journal='molecular cell', bookـtitle=None, series=None, publisher='elsevier', pages=2, volume=35, number=6, year=2009, month='sep', postedat=None, address=None, title='how to choose a good scientific problem', abstract='choosing good problems is essential for being a good scientist. but what is a good problem, and how do you choose one? the subject is not usually discussed explicitly within our profession. scientists are expected to be smart enough to figure it out on their own and through the observation of their teachers. this lack of explicit discussion leaves a vacuum that can lead to approaches such as choosing problems that can give results that merit publication in valued journals, resulting in a job and tenure.')

# Exercise 1. 6 Tasks on top of DataFrames

In [14]:
#to record execution time
startTime = time.time()

#Explode UserLibrary
usersLibrariesExplode = usersLibrariesDf.select(usersLibrariesDf.user_hash_id,explode(usersLibrariesDf.user_library).alias("paper_id"))

#Join UserLibrary and PaperCsv
usersLibraries_PapersCsvDf = papersCsvDf.join(usersLibrariesExplode,papersCsvDf\
                                              .paper_id == usersLibrariesExplode.paper_id, how="inner")\
                                            .select(usersLibrariesExplode.user_hash_id,usersLibrariesExplode.paper_id,papersCsvDf.abstract)


#Removing stop words
# Split the abstract column into an array of words
usersLibraries_PapersCsvDf = usersLibraries_PapersCsvDf.withColumn("abstract_words", split(usersLibraries_PapersCsvDf.abstract, " "))

# Explode the abstract_words array
usersLibraries_PapersCsvDf = usersLibraries_PapersCsvDf.select(usersLibraries_PapersCsvDf.user_hash_id, explode(usersLibraries_PapersCsvDf.abstract_words).alias("abstract"))
uselessWords = ['',' ','"']
#just to make it easy -  df=usersLibraries_PapersCsvDf
dfWithoutStopWords = usersLibraries_PapersCsvDf[~usersLibraries_PapersCsvDf["abstract"].isin(stopWordsBroadcast.value)]
dfWithoutStopWords = dfWithoutStopWords[~dfWithoutStopWords["abstract"].isin(uselessWords)]

#Finding top 10 most frequent words
dfWithoutStopWordsCount = dfWithoutStopWords.groupBy("user_hash_id","abstract").count().withColumnRenamed("count", "word_count")
usersWords = Window.partitionBy(dfWithoutStopWordsCount.user_hash_id).orderBy(col("word_count").desc())
dfWithoutStopWordsRank = dfWithoutStopWordsCount.withColumn("word_rank",rank().over(usersWords))
topFrequentWordsPerUserDf = dfWithoutStopWordsRank.filter(dfWithoutStopWordsRank["word_rank"]<11)
groupTop10FrequentWordsPerUserDf = topFrequentWordsPerUserDf.groupBy("user_hash_id").agg(collect_list("abstract")).withColumnRenamed("collect_list(abstract)", "abstract_word_list")

#writing top 10 most frequent words of each user to file
#groupTop10FrequentWordsPerUserDf.write.save("Top10WordsForEachUserDF")

endTime = time.time()

print("Execution Time taken to Join collections:", endTime-startTime, "seconds")

Execution Time taken to Join collections: 0.34844231605529785 seconds


In [13]:
#to record execution time
startTime = time.time()

#a) Number of (distinct) user, number of (distinct) items, and number of ratings
numOfDistinctUsersDf = str(usersLibrariesExplode.select(countDistinct("user_hash_id")).collect()[0][0])
print("Number of (distinct) user:" ,numOfDistinctUsersDf)

numOfDistinctItemsDf = str(usersLibrariesExplode.select(countDistinct("paper_id")).collect()[0][0])
print("Number of (distinct) items:" ,numOfDistinctItemsDf)

numOfRatingsDf = usersLibrariesExplode.count()
print("Number of ratings:" ,numOfRatingsDf)


ratingsListDf = usersLibrariesExplode.groupBy("user_hash_id").count().withColumnRenamed("count","no_of_items")
ratingsListDf = ratingsListDf.describe("no_of_items")

#b) Min number of ratings a user has given
minNumOfRatingsDf = str(ratingsListDf.filter("summary == 'min'").collect()[0][1])
print("Minimum number of ratings a user has given: ", minNumOfRatingsDf)

#c)  Max number of ratings a user has given
maxNumOfRatingsDf = str(ratingsListDf.filter("summary == 'max'").collect()[0][1])
print("Min number of ratings a user has given:",maxNumOfRatingsDf)

#d)  Average number of ratings of users
avgNumOfRatings = int(numOfRatingsDf)/int(numOfDistinctUsersDf)
print("Average number of ratings a user has given: ", avgNumOfRatings)

#e) Standard deviation for ratings of users
stdevRatingsOfUsers = str(ratingsListDf.filter("summary == 'stddev'").collect()[0][1])
print("Standard deviation of ratings for users",stdevRatingsOfUsers)




ratingsListByPaperIdDf = usersLibrariesExplode.groupBy("paper_id").count().withColumnRenamed("count","no_of_ratings")
ratingsListByPaperIdDf = ratingsListByPaperIdDf.describe("no_of_ratings")


#h) Average number of ratings of items
minNumOfRatingItemGotDf = str(ratingsListByPaperIdDf.filter("summary == 'min'").collect()[0][1])
print("Min number of ratings an item has received:",minNumOfRatingItemGotDf)

#g) Max number of ratings an item has received
maxNumOfRatingItemGotDf = str(ratingsListByPaperIdDf.filter("summary == 'max'").collect()[0][1])
print("Max number of ratings an item has received:",maxNumOfRatingItemGotDf)

#h) Average number of ratings of items
avgNumOfRatingOfItemsDf = int(numOfRatingsDf)/int(numOfDistinctItemsDf)
print("Average number of ratings of items:",avgNumOfRatingOfItemsDf)

#i) Standard deviation for ratings of items
stdevRatingsOfItemsDf = str(ratingsListByPaperIdDf.filter("summary == 'stddev'").collect()[0][1])
print("Standard deviation for ratings of items: ", stdevRatingsOfItemsDf)

endTime = time.time()

print("The time taken for the fundamental analysis of recommender systems:", endTime-startTime,"seconds")


Number of (distinct) user: 28415
Number of (distinct) items: 172079
Number of ratings: 828461
Minimum number of ratings a user has given:  1
Min number of ratings a user has given: 1922
Average number of ratings a user has given:  29.15576280133732
Standard deviation of ratings for users 81.17801478819557
Min number of ratings an item has received: 2
Max number of ratings an item has received: 924
Average number of ratings of items: 4.814422445504681
Standard deviation for ratings of items:  5.477832307606936
The time taken for the fundamental analysis of recommender systems: 3.0855443477630615 seconds
