In [6]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
from operator import add
import time
# from io import StringIO
# import csv
import re
# import sys
 
# initialise Spark Session
spark = SparkSession.builder.appName("cite").getOrCreate()
sc = spark.sparkContext
execution_time_file = open('execution_time_file','a') 

In [7]:
# Parse a line from the file papers_vocab.txt
# comma to separate ID and vocab, space to separate vocabularies.
def parse_papers_count(line):
    if not line:
        return dict()
    papersCountRaw = line.split(' ')
    papersCount = dict()
    for pcRaw in papersCountRaw:
        paper, count = pcRaw.split(':')
        papersCount[paper] = int(count)
    return papersCount

# Parse a line from the file users_libraries.txt
# semi-colon to separate user hash id with library, comma to separate the IDs in the library.
def parse_users_libraries(line):
    if not line:
        return
    
    userHash, libraryRAW = line.split(';')
    library = [int(paper_id) for paper_id in libraryRAW.split(',')]
    return userHash, library    


# Parse a line from the file papers.csv. Comma seperated. Each line has 15 fields.
# The first is paper_id, the last is 'abstract' of a paper.
def parse_papers(line):
    if not line:
        return
    # Old code:
    # papersInfo = csv.reader([line], delimiter=',', quoting=csv.QUOTE_MINIMAL)
    # papersInfoList = list(papersInfo)
    # return papersInfoList[0][0], papersInfoList[0][14]
    
    # Updated code:
    papersInfo = csv.reader([line.replace("\0", "")], delimiter=',', quoting=csv.QUOTE_MINIMAL)
    papersInfoList = next(papersInfo)
    # paper_id, abstract (the last element in the list)
    return papersInfoList[0], papersInfoList[-1]
    
    
    # Using Regular Expression
    # papersInfo = re.match('(?P<paper_id>[^,]*),([^,]*,){13}\\\"*(?P<abstract>.*)\\\"*', line)
    # paper_id, abstract",
    # return papersInfo.group('paper_id'), papersInfo.group('abstract')

# Exercise 6.2 Loading the data into an RDD
Load the dataset carrying out the following steps:
* `userRatingsRDD` : create a pair RDD from <i>user_libraries.txt</i> using the user hash as the key and the liked paper(s) (citeulike doc id) as the value(s).
* `paperTermsRDD` : create a pair RDD from <i>papers.csv</i> using the citeulike doc id as the key and the words contained in the abstract as the value(s)

In [8]:
start_time = time.time()
# load user libraries
userRatingsRDD = sc.textFile('citeulike/users_libraries.txt')
# key - user_hash id, values - list of paper_ids
userRatingsRDD = userRatingsRDD.map(parse_users_libraries)

execution_time_file.write('Loading of userRatingsRDD: --- %s seconds --- \n' % (time.time() - start_time)) 

start_time = time.time()
# load paper terms 
paperTermsRDD = sc.textFile('citeulike/papers.csv')

# key - paper_id, value - abstract
allPaperTermsRDD = paperTermsRDD.map(parse_papers)

# filter empty abstracts
paperTermsRDD = allPaperTermsRDD.filter(lambda x: x[1] is not '')

# key - paper_id, values - list of words 
paperTermsRDD = paperTermsRDD.map(lambda x: (int(x[0]), x[1].split(' ')))

execution_time_file.write('Loading of paperTermsRDD: --- %s seconds --- \n' % (time.time() - start_time)) 

63

## Invalid Solution

In [4]:
# Loading of papers into DF and then convertion to RDD is inappropriate
# Provide schema while loading papers
'''papersSchema = StructType([
    #  name, dataType, nullable
    StructField("paper_id", IntegerType(), False),
    StructField("type", StringType(), True),
    StructField("journal", StringType(), True),
    StructField("book_title", StringType(), True),
    StructField("series", StringType(), True),
    StructField("publisher", StringType(), True),
    StructField("pages", StringType(), True),
    StructField("volume", StringType(), True),
    StructField("number", StringType(), True),
    StructField("year", StringType(), True),
    StructField("month", StringType(), True),
    StructField("postedat", StringType(), True),
    StructField("address", StringType(), True),
    StructField("title", StringType(), True),
    StructField("abstract", StringType(), True)
])

paperTerms = spark.read.csv("citeulike/papers.csv", header = False, schema = papersSchema)
# remove empty abstracts
paperTerms = paperTerms.filter(paperTerms.abstract.isNotNull())
# transform to RDD
paperTermsRDD = paperTerms.select('paper_id' ,'abstract').rdd'''

'papersSchema = StructType([\n    #  name, dataType, nullable\n    StructField("paper_id", IntegerType(), False),\n    StructField("type", StringType(), True),\n    StructField("journal", StringType(), True),\n    StructField("book_title", StringType(), True),\n    StructField("series", StringType(), True),\n    StructField("publisher", StringType(), True),\n    StructField("pages", StringType(), True),\n    StructField("volume", StringType(), True),\n    StructField("number", StringType(), True),\n    StructField("year", StringType(), True),\n    StructField("month", StringType(), True),\n    StructField("postedat", StringType(), True),\n    StructField("address", StringType(), True),\n    StructField("title", StringType(), True),\n    StructField("abstract", StringType(), True)\n])\n\npaperTerms = spark.read.csv("citeulike/papers.csv", header = False, schema = papersSchema)\n# remove empty abstracts\npaperTerms = paperTerms.filter(paperTerms.abstract.isNotNull())\n# transform to RDD\

# Exercise 6.3 Joining Collectios
Implement a simple function that computes for each user the top-k most frequent words appearing in
the papers she likes. Exclude the stop words listed in <i>stopwords en.txt </i>.
Store the results into a file which contains in each line the user hash and the list of her retrieved words
sorted by frequency:
> user hash, word k, word k-1, word k-2,..., word 1

In [5]:
# load and broadcast the stopwords
stopWords = [line.rstrip('\n') for line in open('citeulike/stopwords_en.txt')]
stopWordsBroadcast = sc.broadcast(stopWords)

In [9]:
start_time = time.time()
# key - user_hash_id, value - paper_id
userRatingRDD = userRatingsRDD.flatMapValues(lambda x: x) 

# Exchange key and value. The key becomes paper_id, the value - user_hash_id
ratingUserRDD = userRatingRDD.map(lambda x:(x[1], x[0]))

# key - paper_id, value - a word
paperTermRDD = paperTermsRDD.flatMapValues(lambda x: x) 
# remove stop words before joining
paperTermRDD = paperTermRDD.filter(lambda x: x[1] not in stopWordsBroadcast.value)

# user_hash_id, a word
userWords = ratingUserRDD.join(paperTermRDD).map(lambda x: (x[1][0], x[1][1]))
userWords = userWords.map(lambda x: (x[0] + ' ' + x[1], 1))
countedUserWords = userWords.reduceByKey(add)

# key - user_hash_id, value - (word , word_count)
countedUserWords = countedUserWords.map(lambda x: (x[0].split(' ')[0], [(x[0].split(' ')[1], x[1])]))
countedUserWords = countedUserWords.reduceByKey(lambda a, b: a + b)
countedUserWords = countedUserWords.map(lambda x: (x[0], sorted(x[1], key=lambda y: y[1], reverse=True)[:10] ))
execution_time_file.write('Preparation and joining of RDD: --- %s seconds --- \n' % (time.time() - start_time)) 

# save in a file
start_time = time.time()
countedUserWords.saveAsTextFile("user_top_10_rdd")
execution_time_file.write('Save joined RDD in a file: --- %s seconds --- \n' % (time.time() - start_time))

63

# Exercise 6.4 Basic Analysis for Recommender Systems
Retrieve the following information:
* number of (distinct) user, number of (distinct) items, and number of ratings
* min number of ratings a user has given
* max number of ratings a user has given
* average number of ratings of users
* standard deviation for ratings of users
* min number of ratings an item has received
* max number of ratings an item has received
* average number of ratings of items
* standard deviation for ratings of items

In [10]:
start_time = time.time()
# number of users
usersCount = userRatingsRDD.count()

# number of items - allPaperTermsRDD (papers terms without filtering of empty abstracts)
itemsCount = allPaperTermsRDD.count()

# number of ratings
#ratingUserRDD -> paperid, the value - user_hash_id
ratingsPerUserCount = ratingUserRDD.count()

# userRatingsRDD -> key - user_hash_id, values - list of paper_ids
# count number of liked documents per user
ratingsCountPerUser = userRatingsRDD.map(lambda x: len(x[1]))
minRatingsPerUser = ratingsCountPerUser.min()
maxRatingsPerUser = ratingsCountPerUser.max()
avgRatingsPerUser = ratingsCountPerUser.mean()
stdRatingsPerUser = ratingsCountPerUser.stdev()


# ratingUserRDD -> paper_id, the value - user_hash_id
# group by paper_id and count the number of users rated this item
ratingsCountPerItem = ratingUserRDD.groupBy(lambda x: x[0]).map(lambda x: len(x[1])) 
minRatingsPerItem = ratingsCountPerItem.min()
maxRatingsPerItem = ratingsCountPerItem.max()
avgRatingsPerItem = ratingsCountPerItem.mean()
stdRatingsPerItem = ratingsCountPerItem.stdev()

execution_time_file.write('Basic analysis over RDDs: --- %s seconds --- \n' % (time.time() - start_time))

62

# Exercise 6.5 Loading the dataset into Data Frames
Loading the dataset into Data Frames, leveraging the structure of the data. The result should be a database
which contains one table for each file (you are free to choose the schema of the table) 

In [11]:
# load papers
# provide schema while loading papers
start_time = time.time()
papersSchema = StructType([
    #  name, dataType, nullable
    StructField("paper_id", IntegerType(), False),
    StructField("type", StringType(), True),
    StructField("journal", StringType(), True),
    StructField("book_title", StringType(), True),
    StructField("series", StringType(), True),
    StructField("publisher", StringType(), True),
    StructField("pages", StringType(), True),
    StructField("volume", StringType(), True),
    StructField("number", StringType(), True),
    StructField("year", StringType(), True),
    StructField("month", StringType(), True),
    StructField("postedat", StringType(), True),
    StructField("address", StringType(), True),
    StructField("title", StringType(), True),
    StructField("abstract", StringType(), True)
])

papers = spark.read.csv("citeulike/papers.csv", header = False, schema = papersSchema)

execution_time_file.write('Loading of papers into DF: --- %s seconds --- \n' % (time.time() - start_time))

start_time = time.time()
# load user libraries
usersLibrariesSchema = StructType([
    #  name, dataType, nullable
    StructField("user_hash_id", StringType(), False),
    StructField("user_library", StringType(), False)
])

users_libraries = spark.read.csv('citeulike/users_libraries.txt', sep = ";", header = False, schema = usersLibrariesSchema)
# key - user_hash, value - list of doc ids
users_libraries = users_libraries.selectExpr('user_hash_id', 'split(user_library,",") AS user_library')

execution_time_file.write('Loading of papers terms into DF: --- %s seconds --- \n' % (time.time() - start_time))

69

# Exercise 6.6 Tasks on top of DataFrames
Solve the tasks 6.3 and 6.4 again using DataFrames instead of RDDs. Record the execution times for each
task and data model. Is there any noticeable performance difference between RDDs and DataFrames?
Justify your answer.


Retrieve the following information:
* number of (distinct) user, number of (distinct) items, and number of ratings
* min number of ratings a user has given
* max number of ratings a user has given
* average number of ratings of users
* standard deviation for ratings of users
* min number of ratings an item has received
* max number of ratings an item has received
* average number of ratings of items
* standard deviation for ratings of items

In [12]:
# 6.4 Basic Analysis
start_time = time.time()
# number of users
print(users_libraries.count())

# number of papers
print(papers.count())

# calculate #ratings per user - one column with a number of rated items per user
ratings = users_libraries.select(F.size(users_libraries.user_library).alias("ratings")) 

# number of total ratings
ratings.groupBy().sum('ratings').show()

# min, max, stddev, avg
ratings.describe().show()

# ratings.groupBy().max('ratings').show()
# ratings.groupBy().min('ratings').show()
# ratings.groupBy().avg('ratings').show()
# ratings.groupBy().stddev('ratings').show()

# calculate #ratings per item
user_items = users_libraries.select(F.explode(users_libraries.user_library).alias("items"))
items_count = user_items.groupBy("items").count()
items_count.select("count").describe().show()

execution_time_file.write('Basic Analysis over DF: --- %s seconds --- \n' % (time.time() - start_time))

28416
172079
+------------+
|sum(ratings)|
+------------+
|      828481|
+------------+

+-------+------------------+
|summary|           ratings|
+-------+------------------+
|  count|             28416|
|   mean|29.155440596846848|
| stddev| 81.17660451011594|
|    min|                 1|
|    max|              1922|
+-------+------------------+

+-------+-----------------+
|summary|            count|
+-------+-----------------+
|  count|           172079|
|   mean| 4.81453867119172|
| stddev|5.477818208917285|
|    min|                3|
|    max|              924|
+-------+-----------------+



60

In [13]:
# 6.3 Joining
start_time = time.time()

papers = papers.filter(papers.abstract.isNotNull()).select('paper_id', 'abstract')
users_papers = users_libraries.select('user_hash_id', F.explode(users_libraries.user_library).alias('paper_id'))
user_abstracts = users_papers.join(papers, users_papers.paper_id == papers.paper_id).select('user_hash_id', 'abstract')

# key - user_hash, value a word
user_words = user_abstracts.selectExpr('user_hash_id', 'explode(split(abstract," ")) AS word')

# remove stop words
user_words = user_words.filter(user_words.word.isin(stopWords) == False)     

# for each user and each word, count how many times it occurs
user_words = user_words.groupBy('user_hash_id', 'word').agg(F.count("*").alias('word_occurance'))
user_words_window = Window.partitionBy(user_words.user_hash_id).orderBy(col('word_occurance').desc())

# take first 10 words per user
user_top_10_words = user_words.select(user_words.user_hash_id, user_words.word, rank().over(user_words_window).alias('rank')).filter(col('rank') <= 10).select(user_words.user_hash_id, user_words.word) 

# format into <user_hash> word_k word_k-1 ... word_1
grouped_user_top_10_words = user_top_10_words.groupBy(user_words.user_hash_id).agg(F.collect_set('word').alias('words'))

execution_time_file.write('Preparation and joining of DF: --- %s seconds --- \n' % (time.time() - start_time)) 


# save into text file
start_time = time.time()
grouped_user_top_10_words.write.save("user_top_10_df")
execution_time_file.write('Saving the joined DFs: --- %s seconds --- \n' % (time.time() - start_time)) 

execution_time_file.close() 

In [None]:
# Another solution for removing the stop words is with join
#user_words = user_words.join(stopWords, user_words.word == stopWords._1, "left_outer")
#user_words = user_words.filter(stopWords._1.isNull())

# Additional calculations which are not part of the ex_sheet1

In [None]:
# load authors
authors = spark.read.csv("citeulike/authors.csv", header=True)

# load keywords
keywordsSchema = StructType([
    #  name, dataType, nullable
    StructField("citeulike_doc_id", IntegerType(), False),
    StructField("keyword", StringType(), False)
])

keywords = spark.read.csv("citeulike/keywords.csv", header=False, schema = keywordsSchema)

# load papers vocabularies
papers_vocab = sc.textFile("citeulike/papers_vocab.txt")
# remove the header
header = papers_vocab.first()
papers_vocab= papers_vocab.filter(lambda line: line != header)

papers_vocab = papers_vocab.map(lambda k: k.split(","))
papers_vocab = papers_vocab.map(lambda x: (x[0], parse_papers_count(x[1])))

papersVocSchema = StructType([
    #  name, dataType, nullable
    StructField("citeulike_doc_id", IntegerType(), False),
    StructField("vocabularies", ArrayType(StringType(), False), True),
])
papers_vocab = papers_vocab.toDF(papersVocSchema)

# load vocabularies
vocabs = sc.textFile('citeulike/vocab.txt')
vocabs = vocabs.map(lambda k: k.split("\t")).toDF()

In [None]:
# nullable features calculation
print(papers.filter(papers.type.isNull()).count())
print(papers.filter(papers.journal.isNull()).count())
print(papers.filter(papers.book_title.isNull()).count())
print(papers.filter(papers.series.isNull()).count())
print(papers.filter(papers.publisher.isNull()).count())
print(papers.filter(papers.pages.isNull()).count())
print(papers.filter(papers.volume.isNull()).count())
print(papers.filter(papers.number.isNull()).count())
print(papers.filter(papers.year.isNull()).count())
print(papers.filter(papers.month.isNull()).count())
print(papers.filter(papers.postedat.isNull()).count())
print(papers.filter(papers.address.isNull()).count())
print(papers.filter(papers.title.isNull()).count())
print(papers.filter(papers.abstract.isNull()).count())