# <span style="color:blue">Advanced Databases and Information Systems Project I</span>
## Programming with Spark RDD and DataFrame

**Group Members**:
* Omar Swelam os132@uni-freiburg.de
* Jumshaid Khan jk1308@uni-freiburg.de

**Submitted to**: 
Dr. Fang Wei-Kleiner

**Repository:** https://github.com/iamjumshaid/adbis-projects

**<span style="color:red">Note:</span>** We have added your email `fwei@informatik.uni-freiburg.de` as collaborator to our private GitHub repository.

**References**:
* https://sparkbyexamples.com/pyspark/pyspark-map-transformation/
* https://sparkbyexamples.com/pyspark/pyspark-count/
* https://sparkbyexamples.com/pyspark/pyspark-explode-nested-array-into-rows/
* https://chat.openai.com/ [Used for understanding concepts, code optimisations, and pair programming]
* https://bard.google.com/ [Used for understanding concepts, code optimisations, and pair programming]
  
**Date:** 30.07.2023


In [11]:
from pyspark import SparkConf, SparkContext
import re
import csv
import time
import pandas as pd

### <span style="color:blue">Task 1.2 (Loading the dataset into an RDD)</span>

In [2]:
# Configuring Spark
# Created a SparkConf object, named the application, run Spark on 3 local cores, with executor memory of 12 GB and driver memory of 8 GB
conf = SparkConf().setAppName("LocalSparkCluster").setMaster("local[3]")
conf.set("spark.executor.memory", "12g")
conf.set("spark.driver.memory", "8g")
sc = SparkContext(conf=conf)

**a) userRatingsRDD: creating a pair RDD from user 'libraries.txt' using the `user hash` as the key and the liked
paper(s) `paper_id` as the value(s).**

In [5]:
start_time = time.time()

# Read the text file and create an RDD of lines, e.g. ['28d3f81251d94b09735497477a5e4e02;3929762,503574,5819422,4238883,5788061,46294', ...]
users_papers_rdd = sc.textFile("users_libraries.txt")

# Split the `users_papers_rdd` on the basis of ';' criteria using the map() transformation
# Maps the `users_papers_rdd` lines to [('28d3f81251d94b09735497477a5e4e02', ['3929762', '503574', '5819422', '4238883'], ...)]
users_rdd = users_papers_rdd.map(lambda line: line.split(";")).map(lambda x: (x[0], x[1].split(",")))

# The papers are now stored as a set instead of a list using the `mapValues()` transformation and caching the transformation in memory so it's not recomputed every time.
split_users_rdd = users_rdd.mapValues(lambda x: list(set(x))).cache()

end_time =time.time()

split_users_rdd.take(5)

[('28d3f81251d94b09735497477a5e4e02',
  ['5984302',
   '3481823',
   '4636156',
   '635215',
   '3929762',
   '4810441',
   '503574',
   '4194836',
   '5490453',
   '3366480',
   '5996865',
   '462949',
   '5828780',
   '4450195',
   '4165233',
   '635216',
   '5819422',
   '4238883',
   '5788061',
   '4238942']),
 ('d0c9aaa788153daeaf1f1538b3d46bbb',
  ['5761055',
   '4226226',
   '5184704',
   '5760287',
   '226864',
   '913868',
   '3466838',
   '8806369',
   '2363430',
   '3281547',
   '4781370',
   '239571',
   '6614346',
   '5441098',
   '2080691',
   '1805577',
   '853030',
   '3512183',
   '4089758',
   '2855355',
   '3140015',
   '2445106',
   '4236212',
   '5687747',
   '678653',
   '1959511',
   '2841637',
   '767516',
   '7562861',
   '2049617',
   '6607628',
   '7570111',
   '12571584',
   '2653863',
   '6343346',
   '1012525',
   '2688186',
   '5336762',
   '1277953',
   '4140337',
   '7756088',
   '3190274',
   '2782576',
   '311570',
   '2080631']),
 ('f05bcffe7951de9e5

In [46]:
print('Time taken in creating the pair RDD from user `libraries.txt`: %.2f seconds' % (end_time - start_time))

Time taken in creating the pair RDD from user `libraries.txt`: 0.08 seconds


**b) paperTermsRDD: creating a pair RDD from 'papers.csv' using the `paper_id` as the key and the words contained in the abstract as the value(s).**

In [6]:
start_time = time.time()

# Read the text file and create an RDD of lines
papers_rdd = sc.textFile("papers.csv")
parsed_rdd = papers_rdd.map(lambda line: next(csv.reader([line])))

# From each row in `parsed_rdd`, only take the first column, 'paper_id,' and the last column, 'abstract'
extract_abstract_rdd = parsed_rdd.map(lambda row: (row[0],row[-1]))

# Now, splitting the abstracts into a list of words (to be used later on), removing the punctuations, filtering out non-alpha characters, and caching the result to avoid re-computation.
split_papers_rdd = extract_abstract_rdd.mapValues(lambda x: list(filter(None, re.split("[' ()}{,.?-]", x))))\
                                       .mapValues(lambda x: [word for word in x if word.isalpha()]).cache()
end_time = time.time()

split_papers_rdd.take(5)

[('80546',
  ['the',
   'genetic',
   'code',
   'has',
   'been',
   'regarded',
   'as',
   'arbitrary',
   'in',
   'the',
   'sense',
   'that',
   'the',
   'codon',
   'amino',
   'acid',
   'assignments',
   'could',
   'be',
   'different',
   'than',
   'they',
   'actually',
   'are',
   'this',
   'general',
   'idea',
   'has',
   'been',
   'spelled',
   'out',
   'differently',
   'by',
   'previous',
   'often',
   'rather',
   'implicit',
   'accounts',
   'of',
   'arbitrariness',
   'they',
   'have',
   'drawn',
   'on',
   'the',
   'frozen',
   'accident',
   'theory',
   'on',
   'evolutionary',
   'contingency',
   'on',
   'alternative',
   'causal',
   'pathways',
   'and',
   'on',
   'the',
   'absence',
   'of',
   'direct',
   'stereochemical',
   'interactions',
   'between',
   'codons',
   'and',
   'amino',
   'acids',
   'it',
   'has',
   'also',
   'been',
   'suggested',
   'that',
   'the',
   'arbitrariness',
   'of',
   'the',
   'genetic',
   'c

In [51]:
print('Time taken in creating the pair RDD from `papaers.csv`: %.2f seconds' % (end_time - start_time))

Time taken in creating the pair RDD from `papaers.csv`: 0.08 seconds


### <span style="color:blue">Task 1.3 (Joining Collections)</span>

**1.3.1 Getting stopwords and creating a broadcast variable to share between all workers of Spark cluster**

In [8]:
# Reading the stopwords from the file and storing them in a list
with open('stopwords_en.txt', 'r') as file:
    stopwords_list = [line.rstrip('\n') for line in file]
    print(stopwords_list[:10])

['a', 'able', 'about', 'above', 'according', 'accordingly', 'across', 'actually', 'after', 'afterwards']


In [9]:
# Broadcast variable to share between all the workers in a Spark Cluster
stop_word_rdd = sc.broadcast(stopwords_list)

# This function would return a list of words that are not stop words
# It iterates through the list of words and checking if each word is in the `stop_word_rdd` broadcast variable.
# If the word is not in the broadcast variable, the function adds the word to the output list.
def stopwords_filter(words_list):
    return [word for word in words_list if word not in stop_word_rdd.value]

**1.3.2 Computing the top-10 most frequent words appearing in the papers for each user**

***Method 01:*** Using cartesian product

In [76]:
start_time = time.time()

# Creating a new paper RDD by removing stopwords from earlier created papers RDD in 1.2 part (b)
# paper_tuple[0] = paper_id
# paper_tuple [1] = [list of abstract words in paper]
papers_without_stopwords_rdd = split_papers_rdd.map(lambda paper_tuple: (paper_tuple[0], stopwords_filter(paper_tuple[1])))

# Further filtering the papers_without_stopwords_rdd to ensure that only papers with abstract words as a list are included
# So that we can take cartesian products between split_users_rdd where we have list of users with paper_id
filtered_papers_without_stopwords_rdd = papers_without_stopwords_rdd.filter(lambda x: isinstance(x[1], list))

# users_abstracts_rdd contains the users and the abstracts for the papers they have checked out. 
# The `cartesian()` transformation here creates a cartesian product of the `split_users_rdd` RDD and the `filtered_papers_without_stopwords_rdd` RDD.
# So we have every possible combination of joining each user with each paper and their abstract.
# The `filter()` transformation then applied to filter the cartesian product to only include users who have checked out papers that are in the `filtered_papers_without_stopwords_rdd` RDD. 
# The `map()` transformation is used to map filtered elements to a tuple of the user ID and the abstract for the paper. 
users_with_paper_abstracts_rdd = split_users_rdd.cartesian(filtered_papers_without_stopwords_rdd)\
                                    .filter(lambda x: x[1][0] in x[0][1])\
                                    .map(lambda x: (x[0][0],x[1][1])).cache()

# The `reduceByKey()` transformation is then applied to reduce the `users_with_paper_abstracts_rdd` RDD by user ID, and to count the number of words in the abstracts for each user.
users_words_rdd = users_with_paper_abstracts_rdd.reduceByKey(lambda x,y: x + y)

# The`word_counts_rdd` RDD contains the users and the words that they have used in their abstracts. 
# Applying `flatMap()` transformation to flatten the `users_words_rdd` RDD. 
# The `reduceByKey()` transformation is used to reduce the `word_counts_rdd` RDD by user ID and to count the number of times each word appears in the abstracts for each user. 
# Then, `map()` transformation is used to map each element in the `word_counts_rdd` RDD to a tuple of the user ID and a list of the top 10 words that the user has used in their abstracts.
word_counts_rdd = users_words_rdd.flatMap(lambda x: [((x[0], word), 1) for word in x[1]])\
                    .reduceByKey(lambda x, y: x+y)\
                    .map(lambda x: (x[0][0], [x[0][1],x[1]]))

# `sorted_word_counts_rdd` RDD contains the users and the top 10 words that they have used in their abstracts. 
# The `groupByKey()` transformation is to group the `word_counts_rdd` RDD by user ID. 
# The `mapValues()` transformation then maps each group to a list of the top 10 words for the user. 
# `sorted()` function is used to sort the list of words by the number of times they appear. 
# `reverse=True` argument tells the `sorted()` function to sort the list in reverse order. 
# The `mapValues()` transformation is used to map each group to a list of the words in the group.
sorted_word_counts_rdd = word_counts_rdd.groupByKey()\
                            .mapValues(lambda x: sorted(x, key=lambda x:x[1], reverse=True)[:10])\
                            .mapValues(lambda x: [z[0] for z in x])

# Taking a random sample from the created `sorted_word_counts_rdd` to show each `user_id` with their top 10 words usage in abstracts.
result = sorted_word_counts_rdd.takeSample(False, 50, 250)

end_time = time.time()

result

[('da593ecae187d0c4cfe01b7d8719d6e6',
  ['health',
   'mental',
   'patient',
   'image',
   'literature',
   'treatment',
   'services',
   'review',
   'practice',
   'systematic']),
 ('8b87ac1e5903e5af7dee9e54317f1a7c',
  ['quantum',
   'field',
   'phase',
   'state',
   'states',
   'theory',
   'systems',
   'experimental',
   'single',
   'spin']),
 ('7c5dcabf1ac0304dc89eec3ce3efe305',
  ['path',
   'routing',
   'internet',
   'avail',
   'bw',
   'switching',
   'end',
   'paths',
   'quality',
   'network']),
 ('4988459274c4c2daceb488b219bf9462',
  ['protein',
   'proteins',
   'structure',
   'structures',
   'ribosome',
   'translation',
   'mrna',
   'folding',
   'resolution',
   'membrane']),
 ('41a47ecde12a9c3c74d0994e0452da67',
  ['software',
   'suggests',
   'computer',
   'paper',
   'soft',
   'literature',
   'paradigm',
   'management',
   'project',
   'artifacts']),
 ('6519728d752a57e236943589f77068b5',
  ['content',
   'network',
   'internet',
   'information

In [79]:
print('Time taken to compute the top-10 most frequent words appearing in the papers for each user using cartesian product method: %.2f seconds' % (end_time - start_time))

Time taken to compute the top-10 most frequent words appearing in the papers for each user using cartesian product method: 3615.92 seconds


***Method 02:*** Using join function

**<span style="color:red">Note:</span>** Our first approach took a lot of time for calculation that's why we used another approach to optimise our first approach.

In [25]:
start_time = time.time()

# Creating a new paper RDD by removing stopwords from earlier created papers RDD in 1.2 part (b)
# paper_tuple[0] = paper_id
# paper_tuple [1] = [list of abstract words in paper]
papers_without_stopwords_rdd = split_papers_rdd.map(lambda paper_tuple: (paper_tuple[0], stopwords_filter(paper_tuple[1])))

# Further filtering the papers_without_stopwords_rdd to ensure that only papers with abstract words as a list are included
# So that we can take cartesian products between split_users_rdd where we have list of users with paper_id
filtered_papers_without_stopwords_rdd = papers_without_stopwords_rdd.filter(lambda x: isinstance(x[1], list))

# `users_abstracts_rdd` RDD contains the users and the abstracts for the papers that they have checked out. [(paper_id, user_id),...]
# The `flatMapValues()` transformation is used to flatten the `split_users_rdd` RDD
# and then `map()` transformation is used to map each element in the `flatMapValues()` RDD to a tuple of the user ID and the abstract for the paper. 
# The `int()` function is used to convert the user ID to an integer.
users_abstracts_rdd = split_users_rdd.flatMapValues(lambda x: x).map(lambda x: (int(x[1]),x[0]))

# `filtered_papers_without_stopwords_rdd` has [(paper_id, abstract),...] so we join it directly with `users_abstracts_rdd` with [(paper_id, user_id),...]
# As a result we get [(paper_id, user_id, [list of abstract words without stopwords]),...]
users_papers_with_abstracts_rdd = users_abstracts_rdd.join(filtered_papers_without_stopwords_rdd)
users_papers_with_abstracts_rdd.take(5)

# The `map()` transformation creates each element in the `users_papers_with_abstracts_rdd` RDD to a tuple of the user ID and the abstract for the paper. 
users_abstracts_rdd = users_papers_with_abstracts_rdd.map(lambda x: (x[1][0], x[1][1]))
users_abstracts_rdd.take(5)

# `users_words_rdd` contains the words in the abstracts for the papers that the users have checked out. 
# `flatMapValues()` transformation further is used to flatten the `users_abstracts_rdd` RDD and
# `map()` transformation is used to map each element in the `flatMapValues()` RDD to a tuple of the word and a count of 1 
# Hence, `reduceByKey()` transformation is used to reduce the `users_words_rdd` RDD by word, and to count the number of times each word appears. 
# The `map()` transformation is used to map each element in the `reduceByKey()` RDD to a tuple of the user ID and a list of the words that the user has used in their abstracts. 
users_words_rdd = users_abstracts_rdd.flatMapValues(lambda x: x)
users_words_counts = users_words_rdd.map(lambda x: (x,1)).reduceByKey(lambda x, y: x+y)\
                                  .map(lambda x: (x[0][0], [x[0][1],x[1]])).cache()

# `sorted_word_counts_rdd` RDD contains the users and the top 10 words that they have used in their abstracts. 
# The `groupByKey()` transformation is to group the `word_counts_rdd` RDD by user ID. 
# The `mapValues()` transformation then maps each group to a list of the top 10 words for the user. 
# `sorted()` function is used to sort the list of words by the number of times they appear. 
# `reverse=True` argument tells the `sorted()` function to sort the list in reverse order. 
# The `mapValues()` transformation is used to map each group to a list of the words in the group.
sorted_word_counts_rdd = users_words_counts.groupByKey()\
                            .mapValues(lambda x: sorted(x, key=lambda x:x[1], reverse=True)[:10])\
                            .mapValues(lambda x: [z[0] for z in x])

result = sorted_word_counts_rdd.takeSample(False,50,250)

end_time = time.time()

result

[('3b07fa777afe1d573a80b86a684b6584',
  ['applications',
   'materials',
   'devices',
   'fabrication',
   'mechanical',
   'pdms',
   'silver',
   'actuator',
   'properties',
   'printed']),
 ('2a827b912e95043f5438835e4a82dcdd',
  ['genetic',
   'programming',
   'problems',
   'evolutionary',
   'gene',
   'networks',
   'show',
   'evolution',
   'paper',
   'search']),
 ('f6eb7bc494849bf723769935b8756cb7',
  ['metabolic',
   'microbial',
   'models',
   'communities',
   'interactions',
   'based',
   'community',
   'species',
   'network',
   'approach']),
 ('f40b76a7461831ece29917205e35d396',
  ['social',
   'child',
   'awareness',
   'research',
   'practice',
   'article',
   'gender',
   'relation',
   'fathers',
   'work']),
 ('db242f928ea40198cf01163dda7c7a1a',
  ['bayesian',
   'networks',
   'causal',
   'agent',
   'based',
   'simulation',
   'services',
   'probability',
   'model',
   'conservation']),
 ('28f2453b6a2a7a25061feb8b0f491969',
  ['accessibility',
   'w

In [26]:
print('Time taken to compute the top-10 most frequent words appearing in the papers for each user using join function method: %.2f seconds' % (end_time - start_time))

Time taken to compute the top-10 most frequent words appearing in the papers for each user using join function method: 414.56 seconds


**1.3.3 Storing the the top-10 most frequent words appearing in the papers for each user in a text file**

In [17]:
# store_results_to_file, takes a list of tuples containing user hashes and lists of words.
# It writes the user hash and its corresponding list of words to a file, with each entry on a new line.
def store_results_to_file(user_top_10_words, filename):
    with open(filename, 'w') as file:
        for user_hash, words_list in user_top_10_words:
            line = f"{user_hash}: {', '.join(words_list)}\n"
            file.write(line)

In [32]:
store_results_to_file(result, "user_top_frequent_words.txt")

### <span style="color:blue">Task 1.4 (Basic Analysis for Recommender Systems)</span>

**a) Number of (distinct) user, number of (distinct) items, and number of ratings**

In [9]:
print("Number of distinct users are " + str(split_users_rdd.map(lambda x: x[0]).distinct().count()))

Number of distinct users are 28416


In [10]:
print("Number of distinct papers/items are " + str(split_papers_rdd.map(lambda x: x[0]).distinct().count()))

Number of distinct papers are 172079


In [11]:
print("Number of distinct ratings is " + str(split_users_rdd.flatMap(lambda x: x[1]).count()))

Number of distinct ratings is 828481


**b) Min number of ratings a user has given**

In [13]:
# The RDD contains the user IDs and the number of papers that each user has checked out. 
# The `mapValues()` transformation is used to map each element in the `split_users_rdd` RDD to the 
# length of the list of papers that the user has checked out. 
# The `cache()` transformation is used to cache the `users_stats` RDD.
users_stats = split_users_rdd.mapValues(lambda x: len(x)).cache()

In [14]:
print("Min number of ratings a user has given is " + str(users_stats.map(lambda x: x[1]).min()))


Min number of ratings a user has given is 1


**c) Max number of ratings a user has given**

In [15]:
print("Max number of ratings a user has given is " + str(users_stats.map(lambda x: x[1]).max()))

Max number of ratings a user has given is 1922


**d) Average number of ratings of users**

In [16]:
print("Average number of users ratings is " + str(users_stats.map(lambda x: x[1]).mean()))

Average number of users ratings is 29.155440596846848


**e) Standard deviation for ratings of users**

In [17]:
print("Standard deviation for ratings of users is " + str(users_stats.map(lambda x: x[1]).stdev()))

Standard deviation for ratings of users is 81.1751761366871


**f) Min number of ratings an item has received**

In [23]:
# The RDD contains the paper IDs and the number of times each paper has been checked out. 
# The `flatMapValues()` transformation is used to flatten the `split_users_rdd` RDD. 
# The `map()` transformation is used to map each element in the `flatMapValues()` 
# RDD to a tuple of the paper ID and a count of 1. 
# `reduceByKey()` transformation is used to reduce the `papers_stats` RDD by paper ID, and to count the number of 
# times each paper appears. 
# The `cache()` transformation is used to cache the `papers_stats` RDD.
papers_stats = split_users_rdd.flatMapValues(lambda x: x).map(lambda x: (x[1],1)).reduceByKey(lambda x,y: x+y).cache()


In [24]:
print("Min number of ratings a user has given is " + str(papers_stats.map(lambda x: x[1]).min()))


Min number of ratings a user has given is 3


**g) Max number of ratings an item has received**

In [25]:
print("Max number of ratings a user has given is " + str(papers_stats.map(lambda x: x[1]).max()))

Max number of ratings a user has given is 924


**h) Average number of ratings of items**

In [26]:
print("Average number of users ratings is " + str(papers_stats.map(lambda x: x[1]).mean()))

Average number of users ratings is 4.814538671191683


**i) Standard deviation for ratings of items**

In [28]:
print("Standard deviation for ratings of users is " + str(papers_stats.map(lambda x: x[1]).stdev()))

Standard deviation for ratings of users is 5.477802292314591


### <span style="color:blue">Task 1.5 (Loading the dataset into Data Frames)</span>

In [49]:
import re
import time
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col, explode, split, udf, row_number
from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql.functions import sum, avg, max, min, stddev, count, countDistinct

In [2]:
# Creating a SparkSession with a similar configuration as in Task 1.2
spark = SparkSession.builder \
    .appName("LocalSparkCluster") \
    .master("local[2]") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Accessing the SparkContext 
sc_df = spark.sparkContext

**Creating papersDataFrame with suitable schema**

In [50]:
start_time = time.time()
papers_schema = "`paper_id` INT, `type` STRING, `journal` STRING, `bookـtitle` STRING, `series` STRING, `publisher` STRING, \
                `pages` INT, `volume` INT, `number` INT, `year` INT, `month` STRING, `postedat` STRING, `address` STRING, \
                `title` STRING, `abstract` STRING"

# The DataFrame contains the paper IDs and abstracts from the `papers.csv` file. 
papers_df = spark.read.csv('papers.csv', header=False, schema=papers_schema)['paper_id','abstract']
papers_df.show(5,truncate=True)

end_time = time.time()

+--------+--------------------+
|paper_id|            abstract|
+--------+--------------------+
|   80546|the genetic code ...|
| 5842862|choosing good pro...|
| 1242600|although scientis...|
| 3467077|"many scientists ...|
|  309395|there is increasi...|
+--------+--------------------+
only showing top 5 rows



In [52]:
print('Time taken in creating the papers DataFrame from user `papers.csv`: %.2f seconds' % (end_time - start_time))

Time taken in creating the papers DataFrame from user `papers.csv`: 0.18 seconds


**Creating usersLibraryDataFrame with suitable schema**

In [53]:
start_time = time.time()
users_library_schema = "`user_id` STRING, `papers` STRING"

users_library_df = spark.read.csv("users_libraries.txt", header=False, sep=";", schema=users_library_schema)
users_library_df = users_library_df.withColumn("papers", split(users_library_df.papers, ',').cast(ArrayType(IntegerType())))
users_library_df.show(5,truncate=True)

end_time = time.time()

+--------------------+--------------------+
|             user_id|              papers|
+--------------------+--------------------+
|28d3f81251d94b097...|[3929762, 503574,...|
|d0c9aaa788153daea...|[2080631, 6343346...|
|f05bcffe7951de9e5...|[1158654, 478707,...|
|ca4f1ba4094011d9a...|            [278019]|
|d1d41a15201915503...|[6610569, 6493797...|
+--------------------+--------------------+
only showing top 5 rows



In [54]:
print('Time taken in creating the users DataFrame from user `users_libraries.txt`: %.2f seconds' % (end_time - start_time))

Time taken in creating the users DataFrame from user `users_libraries.txt`: 0.16 seconds


### <span style="color:blue">Task 1.6 (Tasks on top of DataFrames)</span>

**Replicating Task 1.3 using Spark DataFrames**

***1.3.1 Getting stopwords and creating a broadcast variable to share between all workers of Spark cluster***

In [56]:
# Reading the stopwords from the file and storing them in a list
with open('stopwords_en.txt', 'r') as file:
    stopwords_list = [line.rstrip('\n') for line in file]
    print(stopwords_list[:10])

['a', 'able', 'about', 'above', 'according', 'accordingly', 'across', 'actually', 'after', 'afterwards']


In [57]:
# Broadcast variable to share between all the workers in a Spark Cluster
stop_word_rdd = sc_df.broadcast(stopwords_list)

# This function would return a list of words that are not stop words
# It iterates through the list of words and checking if each word is in the `stop_word_rdd` broadcast variable.
# If the word is not in the broadcast variable, the function adds the word to the output list.
def stopwords_filter(words_list):
    return [word for word in words_list if word not in stop_word_rdd.value]

***1.3.2 Computing the top-10 most frequent words appearing in the papers for each user***

In [58]:
# The function processes the paper abstract to remove all the stopwords from them
def abstract_processing(x):
    if x is None:
        return None
    words = re.split("[' ()}{,.?-]", x)
    words = [word for word in words if word not in stop_word_rdd.value]
    return list(filter(str.isalpha, words))

# The `split_function` is the UDF(User Defined Function) to be used in Spark for removing stop-words from the papers.
split_function = udf(abstract_processing, ArrayType(StringType()))

# Using the `withColumn()` function to take the name of the new column and the UDF that we defined is used here to calculate the value of the new column.
papers_df = papers_df.withColumn("abstract", split_function(col("abstract")))
papers_df.show(5)

+--------+--------------------+
|paper_id|            abstract|
+--------+--------------------+
|   80546|[genetic, code, r...|
| 5842862|[choosing, good, ...|
| 1242600|[scientists, typi...|
| 3467077|[scientists, mana...|
|  309395|[increasing, conc...|
+--------+--------------------+
only showing top 5 rows



In [59]:
# Creating a new column called `papers` in the `users_library_df` DataFrame. 
# Then using the `explode()` function to expand the list into rows 
users_paper_pair = users_library_df.withColumn("papers", explode(users_library_df.papers))
users_paper_pair.show(5)

+--------------------+-------+
|             user_id| papers|
+--------------------+-------+
|28d3f81251d94b097...|3929762|
|28d3f81251d94b097...| 503574|
|28d3f81251d94b097...|5819422|
|28d3f81251d94b097...|4238883|
|28d3f81251d94b097...|5788061|
+--------------------+-------+
only showing top 5 rows



In [60]:
start_time = time.time()

# Using the `inner` join here for `users_paper_pair` and `papers_df` 
# to to ensure that only rows that exist in both DataFrames are included in the result.
users_abstract = users_paper_pair.join(papers_df, users_paper_pair.papers ==  papers_df.paper_id,"inner")

# The `abstract_words` column in the `users_abstract_words` DataFrame will contain a single row for each word in the abstracts for the papers that a user has liked.
users_abstract_words = users_abstract.withColumn("abstract_words", explode(col("abstract")))['user_id','abstract_words']

# Now grouping the `user_id` and the `abstract_words` to perform `count()` aggregation function 
# to count the number of times each word appears in the abstracts for the papers that a user likes.
users_words_count = users_abstract_words.groupBy("user_id", "abstract_words").count()

# Using window partition on the `users_words_count` DataFrame by the user ID to group it. 
# Then ordering to order the rows in the window partition by the `count` column in descending order.
windowPartition = Window.partitionBy("user_id").orderBy(col("count").desc())

# Adding a new column `ranking` to the `users_words_count` DataFrame. 
# Using `row_number()` function then for ranking to each row in the window partition.
# @NOTE: Using ranking by the count column
users_words_count = users_words_count.withColumn("ranking", row_number().over(windowPartition))

users_words_count.show()

endtime = time.time()

+--------------------+--------------+-----+-------+
|             user_id|abstract_words|count|ranking|
+--------------------+--------------+-----+-------+
|00095808cdc611fb5...|        errors|    5|      1|
|00095808cdc611fb5...|          text|    3|      2|
|00095808cdc611fb5...|   information|    3|      3|
|00095808cdc611fb5...|        impact|    2|      4|
|00095808cdc611fb5...|           web|    2|      5|
|00095808cdc611fb5...|          list|    2|      6|
|00095808cdc611fb5...|    department|    2|      7|
|00095808cdc611fb5...|   recognition|    2|      8|
|00095808cdc611fb5...| automatically|    2|      9|
|00095808cdc611fb5...|         error|    2|     10|
|00095808cdc611fb5...|          site|    2|     11|
|00095808cdc611fb5...|          data|    2|     12|
|00095808cdc611fb5...|     character|    2|     13|
|00095808cdc611fb5...|       problem|    2|     14|
|00095808cdc611fb5...|      analyzed|    1|     15|
|00095808cdc611fb5...|       induced|    1|     16|
|00095808cdc

In [62]:
print('Time taken in showing the words count: %.2f seconds' % (endtime - start_time))

Time taken in showing the words count: 96.99 seconds


In [18]:
# Showing the top-10 most frequent words appearing in the papers for each user
result = users_words_count.filter(col("ranking") <= 10)
result.show()

+--------------------+--------------+-----+-------+
|             user_id|abstract_words|count|ranking|
+--------------------+--------------+-----+-------+
|00095808cdc611fb5...|        errors|    5|      1|
|00095808cdc611fb5...|          text|    3|      2|
|00095808cdc611fb5...|   information|    3|      3|
|00095808cdc611fb5...|        impact|    2|      4|
|00095808cdc611fb5...|           web|    2|      5|
|00095808cdc611fb5...|          list|    2|      6|
|00095808cdc611fb5...|    department|    2|      7|
|00095808cdc611fb5...|   recognition|    2|      8|
|00095808cdc611fb5...| automatically|    2|      9|
|00095808cdc611fb5...|         error|    2|     10|
|000ac87bf9c1623ee...| consciousness|   14|      1|
|000ac87bf9c1623ee...|         place|    2|      2|
|000ac87bf9c1623ee...|       mystery|    2|      3|
|000ac87bf9c1623ee...|       account|    2|      4|
|000ac87bf9c1623ee...|         world|    2|      5|
|000ac87bf9c1623ee...|       problem|    2|      6|
|000ac87bf9c

***1.3.3 Storing the result of top-10 most frequent words appearing in the papers for each user in a text file***

In [23]:
result.write.csv('user_top_frequent_words_using_df', header=True, mode='overwrite')

**Replicating Task 1.4 using Spark DataFrames**

***a) Number of (distinct) user, number of (distinct) items, and number of ratings***

In [35]:
distinct_user_count = users_library_df.select(countDistinct("user_id")).collect()[0][0]
print("Number of distinct users are " + str(distinct_user_count))

Number of distinct users are 28416


***b) Min number of ratings a user has given***

In [41]:
number_of_papers = udf(lambda x: len(x), IntegerType())
users_stats_df = users_library_df.withColumn("no_papers", number_of_papers(col("papers")))
users_stats_df.agg(min("no_papers")).show()

+--------------+
|min(no_papers)|
+--------------+
|             1|
+--------------+



***c) Max number of ratings a user has given***

In [42]:
users_stats_df.agg(max("no_papers")).show()

+--------------+
|max(no_papers)|
+--------------+
|          1922|
+--------------+



***d) Average number of ratings of users***

In [39]:
users_stats_df.agg(avg("no_papers")).show()

+------------------+
|    avg(no_papers)|
+------------------+
|29.155440596846848|
+------------------+



***e) Standard deviation for ratings of users***

In [40]:
users_stats_df.agg(stddev("no_papers")).show()

+----------------------+
|stddev_samp(no_papers)|
+----------------------+
|     81.17660451011594|
+----------------------+



***f) Min number of ratings an item has received***

In [43]:
paper_counts = users_paper_pair.distinct().groupBy("papers").agg(count("*").alias("paper_count"))
paper_counts.agg(min("paper_count")).show()

+----------------+
|min(paper_count)|
+----------------+
|               3|
+----------------+



***g) Max number of ratings an item has received***

In [45]:
paper_counts.agg(max("paper_count")).show()

+----------------+
|max(paper_count)|
+----------------+
|             924|
+----------------+



***h) Average number of ratings of items***

In [46]:
paper_counts.agg(avg("paper_count")).show()

+----------------+
|avg(paper_count)|
+----------------+
|4.81453867119172|
+----------------+



***i) Standard deviation for ratings of items***

In [47]:
paper_counts.agg(stddev("paper_count")).show()

+------------------------+
|stddev_samp(paper_count)|
+------------------------+
|       5.477818208917296|
+------------------------+



**Performance difference between RDDs and DataFrames**

**1. Execution time in seconds for creating Data Models**:

* Pair RDD for 'users_libraries.txt': 0.08 seconds
* DataFrame for 'users_libraries.txt': 0.18 seconds
----------------------------------------
* Pair RDD for 'papers.csv': 0.08 seconds
* DataFrame for 'papers.csv': 0.16 seconds

**2. Execution time in seconds for finding top 10 frequent words in each paper by the user**:

* Using RDD (Cartesian method): 3615.92 seconds
* Using RDD (Join method): 414.56 seconds
----------------------------------------
* Using Dataframes: 96.99 seconds

**<span style="color:red">Note:** </span>Since it was structured data, so using DataFrames have given better performance while performing the experiments.