In [1]:
from pyspark import SparkContext, SparkConf

In [2]:
if not 'sc' in globals():
    conf = SparkConf().setMaster('local[*]')
    sc = SparkContext(conf=conf)

For this lab we will use the movie ratings file from our movielens dataset. This file contains user rating of different movies.

In [5]:
# load the dataset into an RDD to get started
input_rdd = sc.textFile("/home/fieldengineer/Documents/courses/architect_big_data_solutions_with_spark-master/Datasets//movielens/ratings.csv")

In [6]:
# lets take a look and see what is in this data
input_rdd.take(10)

['userId,movieId,rating,timestamp',
 '1,31,2.5,1260759144',
 '1,1029,3.0,1260759179',
 '1,1061,3.0,1260759182',
 '1,1129,2.0,1260759185',
 '1,1172,4.0,1260759205',
 '1,1263,2.0,1260759151',
 '1,1287,2.0,1260759187',
 '1,1293,2.0,1260759148',
 '1,1339,3.5,1260759125']

In [7]:
# like before we will filter out the header column and create an list-rdd
movie_ratings_rdd = input_rdd.filter(lambda line: 'userId' not in line)
movie_ratings_rdd_rdd = movie_ratings_rdd.map(lambda x: x.split(','))
movie_ratings_rdd_rdd.take(10)

[['1', '31', '2.5', '1260759144'],
 ['1', '1029', '3.0', '1260759179'],
 ['1', '1061', '3.0', '1260759182'],
 ['1', '1129', '2.0', '1260759185'],
 ['1', '1172', '4.0', '1260759205'],
 ['1', '1263', '2.0', '1260759151'],
 ['1', '1287', '2.0', '1260759187'],
 ['1', '1293', '2.0', '1260759148'],
 ['1', '1339', '3.5', '1260759125'],
 ['1', '1343', '2.0', '1260759131']]

### Transformations on RDD of key-value Pairs
Key value pair is as a data structure consisting of a two elements key and a value. Structuring your data in a key-value pair enables you to do further interesting transformation on them, as we will see in this section.

If we are interested to know which movie received the lowest rating overall we can solve with a simple key-value transformation. First we will create a key value pair of the movie id and the user rating

In [8]:
# create a (movie id, rating) key value pair
movie_ratings_rdd = movie_ratings_rdd_rdd.map(lambda x: (x[1],float(x[2])))

In [11]:
movie_ratings_rdd.take(2)

[('31', 2.5), ('1029', 3.0)]

In [12]:
# Lets take a look at all the values in our dataset to see if we have any unexpected value we should remove
movie_ratings_rdd.map(lambda x: x[1]).distinct().collect()

[2.5, 2.0, 4.0, 4.5, 0.5, 3.0, 3.5, 1.0, 5.0, 1.5]

as the data looks good we can proceed to finding out the count

### reduceByKey

The higher-order reduceByKey method takes an associative binary operator as input and reduces values with the same key to a single value using the specified binary operator.

A binary operator takes two values as input and returns a single value as output. An associative operator returns the same result regardless of the grouping of the operands.

The reduceByKey method can be used for aggregating values by key. For example, it can be used for calculating sum, product, minimum or maximum of all the values mapped to the same key.

In [13]:
# reduce by key (airline) to get the total departure delay per airline
ratings_sum_rdd = movie_ratings_rdd.reduceByKey(lambda value1, value2: value1 + value2)

In [16]:
ratings_sum_rdd.take(10)

[('1129', 159.0),
 ('1293', 183.0),
 ('1339', 171.5),
 ('1405', 139.5),
 ('2105', 163.5),
 ('2150', 126.5),
 ('2455', 159.5),
 ('10', 421.0),
 ('17', 337.5),
 ('50', 878.5)]

In [19]:
# print out movie id's with low ratings
ratings_sum_rdd = ratings_sum_rdd.filter(lambda x: x[1]<1.5)
sorted(ratings_sum_rdd.collect(), key=lambda x: x[1])

[('88950', 0.5),
 ('152057', 0.5),
 ('27376', 0.5),
 ('6298', 0.5),
 ('31290', 0.5),
 ('47815', 0.5),
 ('48591', 0.5),
 ('66066', 0.5),
 ('39408', 0.5),
 ('66659', 0.5),
 ('6284', 0.5),
 ('7282', 0.5),
 ('7093', 0.5),
 ('7448', 0.5),
 ('5850', 0.5),
 ('25737', 0.5),
 ('25826', 0.5),
 ('25901', 0.5),
 ('26188', 0.5),
 ('32844', 0.5),
 ('53038', 0.5),
 ('54910', 0.5),
 ('60990', 0.5),
 ('69974', 0.5),
 ('78122', 0.5),
 ('95740', 0.5),
 ('4684', 0.5),
 ('6109', 0.5),
 ('26157', 0.5),
 ('2191', 0.5),
 ('3883', 0.5),
 ('4753', 0.5),
 ('5278', 0.5),
 ('5413', 0.5),
 ('5864', 0.5),
 ('7312', 0.5),
 ('32153', 0.5),
 ('46574', 0.5),
 ('54768', 0.5),
 ('61465', 0.5),
 ('3933', 0.5),
 ('5356', 0.5),
 ('6514', 0.5),
 ('8290', 0.5),
 ('125916', 0.5),
 ('90870', 0.5),
 ('95443', 0.5),
 ('108689', 0.5),
 ('159972', 0.5),
 ('161155', 0.5),
 ('8811', 0.5),
 ('58146', 0.5),
 ('68965', 0.5),
 ('70946', 0.5),
 ('116939', 0.5),
 ('2845', 0.5),
 ('6872', 0.5),
 ('59834', 0.5),
 ('70121', 0.5),
 ('71876', 0.

### groupByKey

The groupByKey method returns an RDD of pairs, where the first element in a pair is a key from the source RDD and the second element is a collection of all the values that have the same key. It is similar to the groupBy method that we saw earlier. The difference is that groupBy is a higher-order method that takes as input a function that returns a key for each element in the source RDD. The groupByKey method operates on an RDD of key-value pairs, so a key generator function is not required as input.

** The groupByKey method should be avoided. It is an expensive operation since it may shuffle data. For most use cases, better alternatives are available. ** 

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

### join
The join method takes an RDD of key-value pairs as input and performs an inner join on the source and input RDDs. It returns an RDD of pairs, where the first element in a pair is a key found in both source and input RDD and the second element is a tuple containing values mapped to that key in the source and input RDD.

In [20]:
# We want to get the movie names, not just the code
ratings_sum_rdd.first()

('5363', 1.0)

In [27]:
# lucky for us, we have a table that translates these ids into actual movie names
movies_names_rdd = sc.textFile("/home/fieldengineer/Documents/courses/architect_big_data_solutions_with_spark-master/Datasets//movielens/movies.csv")\
.filter(lambda x: 'movieId' not in x)\
.map(lambda x: x.split(','))\
.map(lambda x: (x[0],x[1]))

In [28]:
movies_names_rdd.take(10)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)'),
 ('4', 'Waiting to Exhale (1995)'),
 ('5', 'Father of the Bride Part II (1995)'),
 ('6', 'Heat (1995)'),
 ('7', 'Sabrina (1995)'),
 ('8', 'Tom and Huck (1995)'),
 ('9', 'Sudden Death (1995)'),
 ('10', 'GoldenEye (1995)')]

In [29]:
# We can use join to translate the code into names
ratings_sum_rdd.join(movies_names_rdd).take(10)

[('6547', (1.0, 'Northfork (2003)')),
 ('7199', (1.0, 'Melvin Goes to Dinner (2003)')),
 ('65802', (1.0, 'Paul Blart: Mall Cop (2009)')),
 ('152057', (0.5, 'Miles Ahead (2016)')),
 ('155820', (1.0, 'Keanu (2016)')),
 ('160080', (1.0, 'Ghostbusters (2016)')),
 ('27376', (0.5, '"Tunnel')),
 ('48660', (1.0, '"Elementary Particles')),
 ('31290', (0.5, 'Beastmaster 2: Through the Portal of Time (1991)')),
 ('31431', (1.0, 'Boogeyman (2005)'))]

In [30]:
# we can keep only tha name of the movie and the rating
movie_ratings_sum = ratings_sum_rdd.join(movies_names_rdd).map(lambda line: (line[1][1], line[1][0])).collect()

In [31]:
# lets store it and print it sorted 
sorted(movie_ratings_sum, key=lambda x: x[1])

[('Miles Ahead (2016)', 0.5),
 ('"Tunnel', 0.5),
 ('Beastmaster 2: Through the Portal of Time (1991)', 0.5),
 ('"Grudge 3', 0.5),
 ("Tyler Perry's Madea Goes to Jail (2009)", 0.5),
 ('DysFunktional Family (2003)', 0.5),
 ('"Front Page', 0.5),
 ('Envy (2004)', 0.5),
 ('Road Games (a.k.a. Roadgames) (1981)', 0.5),
 ('Libeled Lady (1936)', 0.5),
 ('"End of Summer', 0.5),
 ('Shiver (Eskalofrío) (2008)', 0.5),
 ('Worth Winning (1989)', 0.5),
 ('One from the Heart (1982)', 0.5),
 ('Zombie Holocaust (a.k.a. Doctor Butcher M.D.) (Zombi Holocaust) (1980)',
  0.5),
 ('"Tarzan', 0.5),
 ('"Follow Me', 0.5),
 ('Once Upon a Forest (1993)', 0.5),
 ('"OH in Ohio', 0.5),
 ('Bangkok Dangerous (2008)', 0.5),
 ('"Giant Spider Invasion', 0.5),
 ('Fifty Shades of Grey (2015)', 0.5),
 ('Trespass (2011)', 0.5),
 ('"I', 0.5),
 ('Sunspring (2016)', 0.5),
 ('Yu-Gi-Oh! (2004)', 0.5),
 ('Witless Protection (2008)', 0.5),
 ('Troll 2 (1990)', 0.5),
 ('Werner - Beinhart! (1990)', 0.5),
 ('"House of the Dead', 0.5),
 

### Can we really say these are the worst movies? 
#### No: Because we should also consider how many people voted, not the total sum!

In [22]:
# This is your first excercise. Find the average movie rating per movie and print out a few of the lowest ones!
# HINT: Create another RDD containing the count for each movie ratings made and then join over with the movie ratings sum rdd