In [1]:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as Funct
from pyspark.sql.window import Window
import csv
import re

sparkSession = SparkSession.builder.appName("myApp").getOrCreate()
#Get default configurations
sparkSession.sparkContext._conf.getAll()
#Update default configurations
conf = sparkSession.sparkContext._conf.setAll([('spark.executor.memory', '16g')\
                                        , ('spark.app.name', 'Spark Updated Conf')\
                                        , ('spark.executor.cores', '12')\
                                        , ('spark.cores.max', '12')\
                                        , ('spark.driver.memory','16g')\
                                        ,('spark.driver.maxResultSize','16g')])
#Stop the current Spark Session
sparkSession.sparkContext.stop()
#Create a Spark Session
sparkSession = SparkSession.builder.config(conf=conf).getOrCreate()


## Exercise 1.2 (Loading the dataset into an RDD)
Loading the dataset in the following RDD:<br>
a) userRatingsRDD: pair RDD from **user_libraries.txt** using the user hash as the key and the liked paper(s) as value(s).<br>
b) paperTermsRDD: pair RDD from **papers.csv** using the paper_id as the key and the words contained in the abstract as the value(s).<br>

In [2]:
#Exercise 1.2
#Reading txt and csv files into RDD
userLibRDD = sparkSession.sparkContext.textFile("./mod_users_libraries.txt")
stopWordRDD = sparkSession.sparkContext.textFile("./stopwords_en.txt")
papersRDD = sparkSession.sparkContext.textFile("./papers.csv")


In [3]:
#Exercise 1.2
#Creating a pair RDD from user_libraries.txt using the user hash as key and liked paper(s) as value(s)
userRatingsRDD = userLibRDD.map(lambda line: (line.split(';')[0], line.split(';')[1]))
#Displaying first 'n' elements of dataset
userLibRDD.unpersist()
userRatingsRDD.take(2)

[('28d3f81251d94b09735497477a5e4e02',
  '3929762,503574,5819422,4238883,5788061,462949,635215,635216,4810441,3481823,4165233,3366480,5984302,4238942,5490453,4636156,5996865,4194836,5828780,4450195'),
 ('d0c9aaa788153daeaf1f1538b3d46bbb',
  '2080631,6343346,5184704,7756088,2653863,6607628,4236212,1277953,226864,3140015,8806369,311570,5687747,767516,4781370,2841637,2445106,1959511,2688186,2363430,6614346,853030,5336762,4226226,239571,4089758,4140337,913868,7562861,3190274,2782576,12571584,2049617,5761055,5441098,3466838,2080691,1805577,7570111,5760287,2855355,3281547,1012525,3512183,678653')]

In [4]:
#Exercise 1.2
#converting the userRatingsRDD into key (user_hash_id) value (paper_id) pairs RDD
userRatingsList = []
for keyListVal in userRatingsRDD.collect():
    for val in keyListVal[1].split(','):
        userRatingsList.append((keyListVal[0],val))

userRatingsKVPairRDD = sparkSession.sparkContext.parallelize(userRatingsList)
userRatingsKVPairRDD.take(5)

[('28d3f81251d94b09735497477a5e4e02', '3929762'),
 ('28d3f81251d94b09735497477a5e4e02', '503574'),
 ('28d3f81251d94b09735497477a5e4e02', '5819422'),
 ('28d3f81251d94b09735497477a5e4e02', '4238883'),
 ('28d3f81251d94b09735497477a5e4e02', '5788061')]

In [5]:
#Exercise 1.2
#Creating a pair RDD from papers.csv using paper_id as key and abstract as value(s)
paperTermsRDD = papersRDD.map(lambda line: (line.split(',')[0], ",".join(line.split(',')[14:])))
papersRDD.unpersist()
paperTermsRDD.take(2)

[('80546',
  '"the genetic code has been regarded as arbitrary in the sense that the codon-amino acid assignments could be different than they actually are. this general idea has been spelled out differently by previous, often rather implicit accounts of arbitrariness. they have drawn on the frozen accident theory, on evolutionary contingency, on alternative causal pathways, and on the absence of direct stereochemical interactions between codons and amino acids. it has also been suggested that the arbitrariness of the genetic code justifies attributing semantic information to macromolecules, notably to {dna}. i argue that these accounts of arbitrariness are unsatisfactory. i propose that the code is arbitrary in the sense of jacques monod\'s concept of chemical arbitrariness: the genetic code is arbitrary in that any codon requires certain chemical and structural properties to specify a particular amino acid, but these properties are not required in virtue of a principle of chemistry. 

## Exercise 1.3 (Joining Collections)
Compute for each user the top-10 most frequent words appearing in the papers she likes. Exclude the stop words listed in **stopwords_en.txt**.<br>
Store the results into a file which contains in each line the user hash and the list of her retrieved words sorted by frequency (top 1 is the most frequent).

In [6]:
#Exercise 1.3
#Creating a broadcast variable holding stop words
stopWordBrdcast = sparkSession.sparkContext.broadcast(stopWordRDD.collect())
print(stopWordBrdcast.value[0:10])

['a', 'able', 'about', 'above', 'according', 'accordingly', 'across', 'actually', 'after', 'afterwards']


In [7]:
#Exercise 1.3
#Join the userRatingsKVPairRDD with the paperTermsRDD
#userRatingsKVPairRDD.paper_id = paperTermsRDD.paper_id
joinResultRDD = userRatingsKVPairRDD.map(lambda x: (x[1], x[0])).join(paperTermsRDD)
userRatingsKVPairRDD.unpersist()
joinResultRDD.take(5)


[('503574', ('28d3f81251d94b09735497477a5e4e02', '')),
 ('5788061',
  ('28d3f81251d94b09735497477a5e4e02',
   ' and its impact on {cmb} anomalies",')),
 ('4238942', ('28d3f81251d94b09735497477a5e4e02', '')),
 ('5490453', ('28d3f81251d94b09735497477a5e4e02', '')),
 ('5828780', ('28d3f81251d94b09735497477a5e4e02', ''))]

In [8]:
#Exercise 1.3
#function to concatenate list elements
def concatListElements(x):
    return ''.join(x)

#Create new RDD containing only user_hash_id and abstract from the result of the join
#Using groupByKey() and mapValues() to combine values for each key into a single list
transJoinResultRDD = joinResultRDD.map(lambda x: (x[1][0], x[1][1].strip()))\
                                .groupByKey()\
                                .mapValues(list)\
                                .mapValues(concatListElements)
joinResultRDD.unpersist()
transJoinResultRDD.take(2)

[('f05bcffe7951de9e5a32fff4a42eb088',
  'a protein domain database for functional characterization and annotation.","{prosite} consists of documentation entries describing protein domains, families and functional sites, as well as associated patterns and profiles to identify them. it is complemented by {prorule}, a collection of rules based on profiles and patterns, which increases the discriminatory power of these profiles and patterns by providing additional information about functionally and/or structurally critical amino acids. {prosite} is largely used for the annotation of domain features of {uniprotkb}/{swiss-prot} entries. among the 983 ({dna}-binding) domains, repeats and zinc fingers present in {swiss-prot} (release 57.8 of 22 september 2009), 696 ( approximately 70\\%) are annotated with {prosite} descriptors using information from {prorule}. in order to allow better functional characterization of domains, {prosite} developments focus on subfamily specific profiles and a new

In [9]:
#Exercise 1.3
#splitting the values of transJoinResultRDD every time a blank space is encountered
splitRDD = transJoinResultRDD.map(lambda x: (x[0], x[1].split(' ')))
transJoinResultRDD.unpersist()
splitRDD.take(2)

[('f05bcffe7951de9e5a32fff4a42eb088',
  ['a',
   'protein',
   'domain',
   'database',
   'for',
   'functional',
   'characterization',
   'and',
   'annotation.","{prosite}',
   'consists',
   'of',
   'documentation',
   'entries',
   'describing',
   'protein',
   'domains,',
   'families',
   'and',
   'functional',
   'sites,',
   'as',
   'well',
   'as',
   'associated',
   'patterns',
   'and',
   'profiles',
   'to',
   'identify',
   'them.',
   'it',
   'is',
   'complemented',
   'by',
   '{prorule},',
   'a',
   'collection',
   'of',
   'rules',
   'based',
   'on',
   'profiles',
   'and',
   'patterns,',
   'which',
   'increases',
   'the',
   'discriminatory',
   'power',
   'of',
   'these',
   'profiles',
   'and',
   'patterns',
   'by',
   'providing',
   'additional',
   'information',
   'about',
   'functionally',
   'and/or',
   'structurally',
   'critical',
   'amino',
   'acids.',
   '{prosite}',
   'is',
   'largely',
   'used',
   'for',
   'the',
   'a

In [10]:
#Exercise 1.3
#function to remove stop words from word list
def excludeStopWords(wordList):
    ls = list()
    for word in wordList:
#if word not in stop word list then add word in relevant words list
        if word not in stopWordBrdcast.value:
            ls.append(word)    
    return(ls)

#relevantWordRDD contains the user_hash_id and list of words not in the stopwords_en.txt
relevantWordRDD = splitRDD.mapValues(excludeStopWords)
splitRDD.unpersist()
relevantWordRDD.take(2)


[('f05bcffe7951de9e5a32fff4a42eb088',
  ['protein',
   'domain',
   'database',
   'functional',
   'characterization',
   'annotation.","{prosite}',
   'consists',
   'documentation',
   'entries',
   'describing',
   'protein',
   'domains,',
   'families',
   'functional',
   'sites,',
   'patterns',
   'profiles',
   'identify',
   'them.',
   'complemented',
   '{prorule},',
   'collection',
   'rules',
   'based',
   'profiles',
   'patterns,',
   'increases',
   'discriminatory',
   'power',
   'profiles',
   'patterns',
   'providing',
   'additional',
   'information',
   'functionally',
   'and/or',
   'structurally',
   'critical',
   'amino',
   'acids.',
   '{prosite}',
   'largely',
   'annotation',
   'domain',
   'features',
   '{uniprotkb}/{swiss-prot}',
   'entries.',
   '983',
   '({dna}-binding)',
   'domains,',
   'repeats',
   'zinc',
   'fingers',
   'present',
   '{swiss-prot}',
   '(release',
   '57.8',
   '22',
   'september',
   '2009),',
   '696',
   '(',
  

In [None]:
#Exercise 1.3
#counting the frequency of each individual words used by every user
#rearranging the result as user_hash_id as key and the tuple (word, count) as value
wordCountPerUserRDD = relevantWordRDD.flatMapValues(lambda x: x)\
                                        .map(lambda x: (x,1))\
                                        .reduceByKey(lambda a, b: a + b)
relevantWordRDD.unpersist()
wordCountPerUserRDD.take(10)


In [12]:
rearrangedRDD = wordCountPerUserRDD.map(lambda x: ((x[0][0], (x[0][1], x[1]))))
wordCountPerUserRDD.unpersist()
rearrangedRDD.take(10)

[('f05bcffe7951de9e5a32fff4a42eb088', ('database', 61)),
 ('f05bcffe7951de9e5a32fff4a42eb088', ('functional', 40)),
 ('f05bcffe7951de9e5a32fff4a42eb088', ('documentation', 4)),
 ('f05bcffe7951de9e5a32fff4a42eb088', ('families', 5)),
 ('f05bcffe7951de9e5a32fff4a42eb088', ('identify', 21)),
 ('f05bcffe7951de9e5a32fff4a42eb088', ('rules', 3)),
 ('f05bcffe7951de9e5a32fff4a42eb088', ('power', 8)),
 ('f05bcffe7951de9e5a32fff4a42eb088', ('{uniprotkb}/{swiss-prot}', 1)),
 ('f05bcffe7951de9e5a32fff4a42eb088', ('repeats', 1)),
 ('f05bcffe7951de9e5a32fff4a42eb088', ('70\\%)', 1))]

In [13]:
#Exercise 1.3
#Grouping by key and sorting the words in according to their respective word count in descending order
groupedSortedRDD = rearrangedRDD.groupByKey()\
                                .map(lambda x: (x[0], sorted(list(x[1]), key=lambda x: x[1], reverse=True)))
rearrangedRDD.unpersist()
groupedSortedRDD.take(2)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 27.0 failed 1 times, most recent failure: Lost task 1.0 in stage 27.0 (TID 30, localhost, executor driver): com.esotericsoftware.kryo.KryoException: java.io.IOException: No space left on device
	at com.esotericsoftware.kryo.io.Output.flush(Output.java:183)
	at com.esotericsoftware.kryo.io.Output.require(Output.java:160)
	at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:246)
	at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:232)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:54)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:43)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
	at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:241)
	at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:134)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:241)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
	at com.esotericsoftware.kryo.io.Output.flush(Output.java:181)
	... 17 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:149)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: No space left on device
	at com.esotericsoftware.kryo.io.Output.flush(Output.java:183)
	at com.esotericsoftware.kryo.io.Output.require(Output.java:160)
	at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:246)
	at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:232)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:54)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:43)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
	at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:241)
	at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:134)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:241)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
	at com.esotericsoftware.kryo.io.Output.flush(Output.java:181)
	... 17 more


In [None]:
#Exercise 1.3
#Choosing the top ten words used by each user
topTenGroupRDD = groupedSortedRDD.map(lambda x: ([x[0]] ,x[1][0:10]))
groupedSortedRDD.unpersist()
topTenGroupRDD.take(3)

In [None]:
#Exercise 1.3
#Displaying the top ten words used by each user without the associated word count
finalResultRDD = topTenGroupRDD.mapValues(lambda x: [t[0] for t in x])
topTenGroupRDD.unpersist()
finalResultRDD.take(3)

In [None]:
#Exercise 1.3
#Saving the results as text file
#finalResultRDD.saveAsTextFile("output_files")

## Exercise 1.4 (Basic Analysis for Recommender Systems)
Basic analysis to get an idea of the characteristics of the dataset.

In [None]:
#Exercise 1.4 a)
#Number of distinct users
#userLibRDD.count()
userLibRDD.map(lambda line: line.split(';')[0]).distinct().count()

In [None]:
#Exercise 1.4 a)
#Number of distinct items
papersRDD.map(lambda line: line.split(',')[0]).distinct().count()

In [None]:
#Exercise 1.4 a)
#Number of total ratings by users
userLibRDD.map(lambda line: (line.split(';')[0], line.split(';')[1]))\
            .map(lambda x: x[1].split(','))\
            .flatMap(lambda x: x)\
            .count()

In [None]:
#Exercise 1.4 b)
#Min number of ratings a user has given
userLibRDD.map(lambda line: (line.split(';')[0], line.split(';')[1]))\
            .map(lambda x: (x[0], x[1].split(',')))\
            .mapValues(len)\
            .map(lambda x: x[1])\
            .min()

In [None]:
#Exercise 1.4 c)
#Max number of ratings a user has given
userLibRDD.map(lambda line: (line.split(';')[0], line.split(';')[1]))\
            .map(lambda x: (x[0], x[1].split(',')))\
            .mapValues(len)\
            .map(lambda x: x[1])\
            .max()

In [None]:
#Exercise 1.4 d)
#Average number of ratings of users
userLibRDD.map(lambda line: (line.split(';')[0], line.split(';')[1]))\
            .map(lambda x: (x[0], x[1].split(',')))\
            .mapValues(len)\
            .map(lambda x: x[1])\
            .mean()


In [None]:
#Exercise 1.4 e)
#Standard deviation for ratings of users
userLibRDD.map(lambda line: (line.split(';')[0], line.split(';')[1]))\
            .map(lambda x: (x[0], x[1].split(',')))\
            .mapValues(len)\
            .map(lambda x: x[1])\
            .stdev()


In [None]:
#Exercise 1.4 f)
#Min number of ratings an item has received
userLibRDD.map(lambda line: (line.split(';')[0], line.split(';')[1]))\
            .map(lambda x: x[1].split(','))\
            .flatMap(lambda x: x)\
            .map(lambda x: (x, 1))\
            .reduceByKey(lambda a, b: a + b)\
            .map(lambda x: x[1])\
            .min()

In [None]:
#Exercise 1.4 g)
#Max number of ratings an item has received
userLibRDD.map(lambda line: (line.split(';')[0], line.split(';')[1]))\
            .map(lambda x: x[1].split(','))\
            .flatMap(lambda x: x)\
            .map(lambda x: (x, 1))\
            .reduceByKey(lambda a, b: a + b)\
            .map(lambda x: x[1])\
            .max()

In [None]:
#Exercise 1.4 h)
#Average number of ratings of items
userLibRDD.map(lambda line: (line.split(';')[0], line.split(';')[1]))\
            .map(lambda x: x[1].split(','))\
            .flatMap(lambda x: x)\
            .map(lambda x: (x, 1))\
            .reduceByKey(lambda a, b: a + b)\
            .map(lambda x: x[1])\
            .mean()

In [None]:
#Exercise 1.4 i)
#Standard deviation for ratings of items
userLibRDD.map(lambda line: (line.split(';')[0], line.split(';')[1]))\
            .map(lambda x: x[1].split(','))\
            .flatMap(lambda x: x)\
            .map(lambda x: (x, 1))\
            .reduceByKey(lambda a, b: a + b)\
            .map(lambda x: x[1])\
            .stdev()

## Exercise 1.5 (Loading the dataset into Dataframes)

In [None]:
#Exercise 1.5
#loading data from users_libraries.txt into a dataframe
df = sparkSession.read.csv(path="./mod_users_libraries.txt", sep=';')
df.show(5, truncate=True)

In [None]:
#Exercise 1.5
#loading data from users_libraries.txt into a dataframe
#Loading the data in and RDD and splitting the paper_id id values
userLibraryRDD = sparkSession.sparkContext.textFile("./mod_users_libraries.txt")\
                .map(lambda x: (x.split(';')[0], x.split(';')[1]))\
                .map(lambda x: (x[0], x[1].split(',')))\
                .flatMapValues(lambda x: x)

#Creating the schema for the dataframe
schemaString = "user_hash_id paper_id"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
#Creating the dataframe
userLibDF = sparkSession.createDataFrame(userLibRDD, schema)
userLibDF.show(5, truncate=True)

In [None]:
#Exercise 1.5
#Loading papers.csv data into a dataframe
#Defining the column names
columns = ['paper_id', 'type', 'journal', 'book_title', \
           'series', 'publisher', 'pages', 'volume', \
           'number', 'year', 'month', 'postedate',\
           'address', 'title', 'abstract']
papersDF = sparkSession.read\
            .load("./papers.csv", format="csv", sep=",", inferSchema="true", quote='"', header="false")\
            .toDF(*columns)
papersDF.show(2, truncate=True)

In [None]:
#Exercise 1.5
#Loading stopwords_en.txt data into a dataframe
stopWordsDF = sparkSession.read\
                .load("./stopwords_en.txt", format="text", sep=" ", inferSchema="true", header="false")\
                .toDF('stop_word')
stopWordsDF.show(5, truncate=True)

## Exercise 1.6 (Tasks on top of Dataframes)
### Exercise 1.4 (Basic Analysis for Recommender Systems)

In [None]:
#Exercise 1.6
#Exercise 1.4 a) Using DataFrames (Basic Analysis for Recommender Systems)
#Number of distinct users
userLibDF.select("user_hash_id").distinct().count()

In [None]:
#Exercise 1.6
#Exercise 1.4 a) Using DataFrames (Basic Analysis for Recommender Systems)
#Number of distinct items
papersDF.select("paper_id").distinct().count()

In [None]:
#Exercise 1.6
#Exercise 1.4 a) Using DataFrames (Basic Analysis for Recommender Systems)
#Number of ratings
userLibDF.select("paper_id").count()

In [None]:
#Exercise 1.6
#Exercise 1.4 b) Using DataFrames (Basic Analysis for Recommender Systems)
#Min number of ratings a user has given
userLibDF.select("user_hash_id").groupBy("user_hash_id").count().agg(Funct.min("count")).show()

In [None]:
#Exercise 1.6
#Exercise 1.4 c) Using DataFrames (Basic Analysis for Recommender Systems)
#Max number of ratings a user has given
userLibDF.select("user_hash_id").groupBy("user_hash_id").count().agg(Funct.max("count")).show()

In [None]:
#Exercise 1.6
#Exercise 1.4 d) Using DataFrames (Basic Analysis for Recommender Systems)
#Average number of ratings of users
userLibDF.select("user_hash_id").groupBy("user_hash_id").count().agg(Funct.mean("count")).show()

In [None]:
#Exercise 1.6
#Exercise 1.4 e) Using DataFrames (Basic Analysis for Recommender Systems)
#Standard deviation for ratings of users
userLibDF.select("user_hash_id").groupBy("user_hash_id").count().agg(Funct.stddev("count")).show()

In [None]:
#Exercise 1.6
#Exercise 1.4 f) Using DataFrames (Basic Analysis for Recommender Systems)
#Min number of ratings an item has received
userLibDF.select("paper_id").groupBy("paper_id").count().agg(Funct.min("count")).show()

In [None]:
#Exercise 1.6
#Exercise 1.4 g) Using DataFrames (Basic Analysis for Recommender Systems)
#Max number of ratings an item has received
userLibDF.select("paper_id").groupBy("paper_id").count().agg(Funct.max("count")).show()

In [None]:
#Exercise 1.6
#Exercise 1.4 h) Using DataFrames (Basic Analysis for Recommender Systems)
#Average number of ratings of items
userLibDF.select("paper_id").groupBy("paper_id").count().agg(Funct.mean("count")).show()

In [None]:
#Exercise 1.6
#Exercise 1.4 i) Using DataFrames (Basic Analysis for Recommender Systems)
#Average number of ratings of items
userLibDF.select("paper_id").groupBy("paper_id").count().agg(Funct.stddev("count")).show()

## Exercise 1.6 (Tasks on top of Dataframes)
### Exercise 1.3 (Joining Collections)

In [None]:
#Exercise 1.6
#Exercise 1.3 Using Dataframes (Joining collections)
#Joining the userLib dataframe with the papersDF dataframe over paper_id
#Dropping any row that has null value in the abstract column
joinedDF = userLibDF.join(papersDF, userLibDF.paper_id == papersDF.paper_id, "inner")\
                        .select(userLibDF.user_hash_id,\
                                Funct.explode(Funct.split(papersDF.abstract," "))\
                                .alias("word"))\
                        .na.drop("any")

joinedDF.show(20, truncate=True)

In [None]:
#Exercise 1.6
#Exercise 1.3 Using Dataframes (Joining collections)
#Subtracting the stop words from the user and abstract words dataframe
withoutStpWrdDF = joinedDF.join(stopWordsDF, joinedDF.word==stopWordsDF.stop_word, how="left_anti")
withoutStpWrdDF.show(20, truncate=True)

In [None]:
#Exercise 1.6
#Exercise 1.3 Using Dataframes (Joining collections)
#Grouping the joinedDf dataframe over 'user_hash_id' and 'word' columns
#Sorting in ascending order the result w.r.t the 'user_hash_id' column
groupedDF = withoutStpWrdDF.groupBy("user_hash_id", "word")\
                    .agg(Funct.count("word").alias("word_count"))\
                    .sort(Funct.asc("user_hash_id"))

groupedDF.show(10, truncate=True)

In [None]:
#Exercise 1.6
#Exercise 1.3 Using Dataframes (Joining collections)
#Creating a window and using 'rank' function over the window to give a rank to the words per user and according 
#to the word count
window = Window.partitionBy("user_hash_id").orderBy(Funct.desc("word_count"))
rankedResultDF = groupedDF.withColumn("ranking", Funct.rank().over(window))
filteredRankedResDF = rankedResultDF.filter(rankedResultDF.ranking < 10).select("user_hash_id", "word", "ranking")
filteredRankedResDF.show(30, truncate=True)

In [None]:
#Exercise 1.6
#Exercise 1.3 Using Dataframes (Joining collections)
#Transforming the dataframe so that the top 10 most used words are combined into a list for eash user 
finalResultDF = filteredRankedResDF.groupBy("user_hash_id").agg(Funct.collect_list("word").alias("word"))
finalResultDF.show(10, truncate=True)

In [None]:
#Exercise 1.6
#Exercise 1.3 Using Dataframes (Joining collections)
#Saving the result as a text file
#finalResultDF.select(finalResultDF.user_hash_id, finalResultDF.word.cast("string")).write.csv(path='dataframe_result',mode='overwrite',sep=' ')