# Lista02 - Introduction to Resilient Distributed Dataset (RDD)

The goal of this notebook is to show some examples of using RDDs. In order to do that, we will use the ml-100k dataset (https://grouplens.org/datasets/movielens/100k/) that comprises a set of movie ratings. From our point of view, this dataset is mainly composed by three files:

1. u.item: containing information about the movies (e.g. movie id, title, release date, etc.)
2. u.user: containing user data (e.g. user id, age, gender, and occupation)
3. u.data: representing ratings attributed by the users to the movies

In this pratical class, we will view how to create RDDs based on the content of those files, use transformations to create new RDD, and use action effectivelly data to answer the following questions:

1. What are the percentages of male and female reviewers?
2. How many evaluations our top-10 reviewers did?
3. What is the average rating of our top-10 movies?
4. What is the average rating given by male reviewers? And what about females?
5. Which movie can be considered the "best one"? And the worst one?
6. What is the name and age of the user that have done more evaluations?
7. Is there any users that did not evaluate a movie?

You should start by loading those files as RDDs, and listing some of their registries in order to understand their formats.


## Exercise 0: Creating RDDs from our files

In [1]:
from pyspark import SparkConf, SparkContext
import collections


In [2]:
conf = SparkConf().setMaster("local[*]").setAppName("FirstSparkApp")
sc = SparkContext(conf = conf)
sc.uiWebUrl

'http://f9175e11752a:4043'

In [3]:
usersRdd = sc.textFile('u.user')
usersRdd.setName('users').cache()
usersRdd.getNumPartitions()
# usersRdd.take(10)

1

In [4]:
'''user id | item id | rating | timestamp'''
ratingsRdd = sc.textFile('u.data')
ratingsRdd.setName('ratings').cache()

# ratingsRdd.cache()
# ratingsRdd.take(10)

ratings MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

In [5]:
moviesRdd = sc.textFile('u.item')
# moviesRdd.take(10)

## Exercise 1: What are the percentages of male and female reviewers?

### Solution a:

In [6]:
percentageCol = usersRdd.map(lambda line : (line.split('|')[2], 1)).countByKey()
numberOfUsers = percentageCol['M']+percentageCol['F']
type(percentageCol)
print(percentageCol)
print("Male reviewers: % 5.2f %%" %(round(100*percentageCol['M']/numberOfUsers,2)))
print("Male reviewers: % 5.2f %%" %(round(100*percentageCol['F']/numberOfUsers,2)))

defaultdict(<class 'int'>, {'M': 670, 'F': 273})
Male reviewers:  71.05 %
Male reviewers:  28.95 %


### Solution b:

In [7]:
males = usersRdd.filter(lambda line : line.split('|')[2] == 'M').count()
females = usersRdd.filter(lambda line : line.split('|')[2] == 'F').count()

print("Male reviewers: % 5.2f %%" %(round(100*males/numberOfUsers,2)))
print("Female reviewers: %5.2f %%" %(round(100*females/numberOfUsers,2)))

Male reviewers:  71.05 %
Female reviewers: 28.95 %


### Solution c:

In [8]:
percentageRdd = usersRdd.map(lambda line : (line.split('|')[2], 1)).reduceByKey(lambda a,b : (a + b)).mapValues(lambda x: round((100 * x)/numberOfUsers,2))
percentageRdd.collect()

[('M', 71.05), ('F', 28.95)]

In [9]:
for genre, percent in percentageRdd.collect():
    print(('Male' if genre == 'M' else 'Female') + " reviewers: %5.2f %%" % percent)
    print(('Male','Female')[genre == 'F'] + " reviewers: %5.2f %%" % percent)

Male reviewers: 71.05 %
Male reviewers: 71.05 %
Female reviewers: 28.95 %
Female reviewers: 28.95 %


## Exercise 2: How many evaluations our top-10 reviewers did?

### Solution a (ordering in python)

In [10]:
userRatingsDict = ratingsRdd.map(lambda line: (int(line.split('\t')[0]),1)).countByKey()
sortedResults = list(collections.OrderedDict(sorted(userRatingsDict.items(), key=lambda pair: pair[1], reverse=True)).items())
print(type(sortedResults))
top = 0
while(top < 10):
    user = sortedResults[top]
    print(str(user[0]) + ": " + str(user[1]))
    top += 1
#for key,value in sortedResults.items():
    

<class 'list'>
405: 737
655: 685
13: 636
450: 540
276: 518
416: 493
537: 490
303: 484
234: 480
393: 448


### Solution b (ordering in Spark)

In [11]:
userRatingsDict = ratingsRdd.map(lambda line: (int(line.split('\t')[0]),1)).reduceByKey(lambda x, y: x+y).sortBy(lambda register: register[1], ascending = False)
userRatingsDict.take(10)

[(405, 737),
 (655, 685),
 (13, 636),
 (450, 540),
 (276, 518),
 (416, 493),
 (537, 490),
 (303, 484),
 (234, 480),
 (393, 448)]