In [2]:
import sys
import os

sys.path.insert(0, '/usr/hdp/current/spark2-client/python')
sys.path.insert(0, '/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip')

os.environ['SPARK_HOME'] = '/usr/hdp/current/spark2-client/'
os.environ['SPARK_CONF_DIR'] = '/etc/spark2/conf'
os.environ['PYSPARK_PYTHON'] = '/opt/anaconda3/bin/python'

import pyspark
conf = pyspark.SparkConf()
conf.setMaster("yarn")
conf.set("spark.driver.memory","1g")
conf.set("spark.executor.instances", "3")
conf.set("spark.executor.memory","2g")
conf.set("spark.executor.cores","1")

sc = pyspark.SparkContext(conf=conf)

In [3]:
sc

### Movie Ratings

An independent movie company is looking to invest in a new movie project. With limited finances, the company wants to 
analyze the reactions of audiences, particularly toward various movie genres, in order to identify a 
movie project to focus on which will help the business earn more profit. The company relies on data collected from a publicly available recommendation service by [MovieLens](http://dl.acm.org/citation.cfm?id=2827872). This [dataset](http://files.grouplens.org/datasets/movielens/ml-10m-README.html) contains **24,404,096** ratings and **668,953** tags applied across **40,110** movies. This data was created by **247,753** users between January 09, 1995 and January 29, 2016. This dataset was generated on October 17, 2016. 

From this dataset, several analyses are possible, include the following:
1.   Find movies which have the highest average ratings over the years and identify the corresponding genre.
2.   Find genres which have the highest average ratings over the years.
3.   Find users who rate movies most frequently in order to contact them for an in-depth marketing analysis.

These types of analyses, which are somewhat ambiguous, demand the ability to quickly process large amounts of data in 
a relatively short amount of time for justifying business decisions. In these situations, the size of the data typically makes analysis done on a single machine impossible and analysis done using a remote storage system impractical. For the remainder of the lessons, we will learn how HDFS provides the basis to store a massive amount of data and to enable the programming approach to analyze this data.

In [1]:
!hdfs dfs -put ../data/ml-latest-small ./
!hdfs dfs -ls
!hdfs dfs -ls ./ml-latest-small

Found 3 items
drwxr-xr-x   - lngo hadoop          0 2019-12-22 23:44 .sparkStaging
drwxr-xr-x   - lngo hadoop          0 2019-12-22 23:47 ml-latest-small
drwxr-xr-x   - lngo hadoop          0 2019-12-22 23:40 text
Found 5 items
-rw-r--r--   3 lngo hadoop       8342 2019-12-22 23:47 ml-latest-small/README.txt
-rw-r--r--   3 lngo hadoop     188236 2019-12-22 23:47 ml-latest-small/links.csv
-rw-r--r--   3 lngo hadoop     484688 2019-12-22 23:47 ml-latest-small/movies.csv
-rw-r--r--   3 lngo hadoop    2382886 2019-12-22 23:47 ml-latest-small/ratings.csv
-rw-r--r--   3 lngo hadoop     114976 2019-12-22 23:47 ml-latest-small/tags.csv


In [4]:
!!hdfs dfs -cat ml-latest-small/movies.csv \
    2>/dev/null | head -n 100

['movieId,title,genres',
 '1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy',
 '2,Jumanji (1995),Adventure|Children|Fantasy',
 '3,Grumpier Old Men (1995),Comedy|Romance',
 '4,Waiting to Exhale (1995),Comedy|Drama|Romance',
 '5,Father of the Bride Part II (1995),Comedy',
 '6,Heat (1995),Action|Crime|Thriller',
 '7,Sabrina (1995),Comedy|Romance',
 '8,Tom and Huck (1995),Adventure|Children',
 '9,Sudden Death (1995),Action',
 '10,GoldenEye (1995),Action|Adventure|Thriller',
 '11,"American President, The (1995)",Comedy|Drama|Romance',
 '12,Dracula: Dead and Loving It (1995),Comedy|Horror',
 '13,Balto (1995),Adventure|Animation|Children',
 '14,Nixon (1995),Drama',
 '15,Cutthroat Island (1995),Action|Adventure|Romance',
 '16,Casino (1995),Crime|Drama',
 '17,Sense and Sensibility (1995),Drama|Romance',
 '18,Four Rooms (1995),Comedy',
 '19,Ace Ventura: When Nature Calls (1995),Comedy',
 '20,Money Train (1995),Action|Comedy|Crime|Drama|Thriller',
 '21,Get Shorty (1995),Comedy|Crime

In [5]:
!!hdfs dfs -cat ml-latest-small/ratings.csv \
    2>/dev/null | head -n 100

['userId,movieId,rating,timestamp',
 '1,1,4.0,964982703',
 '1,3,4.0,964981247',
 '1,6,4.0,964982224',
 '1,47,5.0,964983815',
 '1,50,5.0,964982931',
 '1,70,3.0,964982400',
 '1,101,5.0,964980868',
 '1,110,4.0,964982176',
 '1,151,5.0,964984041',
 '1,157,5.0,964984100',
 '1,163,5.0,964983650',
 '1,216,5.0,964981208',
 '1,223,3.0,964980985',
 '1,231,5.0,964981179',
 '1,235,4.0,964980908',
 '1,260,5.0,964981680',
 '1,296,3.0,964982967',
 '1,316,3.0,964982310',
 '1,333,5.0,964981179',
 '1,349,4.0,964982563',
 '1,356,4.0,964980962',
 '1,362,5.0,964982588',
 '1,367,4.0,964981710',
 '1,423,3.0,964982363',
 '1,441,4.0,964980868',
 '1,457,5.0,964981909',
 '1,480,4.0,964982346',
 '1,500,3.0,964981208',
 '1,527,5.0,964984002',
 '1,543,4.0,964981179',
 '1,552,4.0,964982653',
 '1,553,5.0,964984153',
 '1,590,4.0,964982546',
 '1,592,4.0,964982271',
 '1,593,4.0,964983793',
 '1,596,5.0,964982838',
 '1,608,5.0,964982931',
 '1,648,3.0,964982563',
 '1,661,5.0,964982838',
 '1,673,3.0,964981775',
 '1,733,4.0,9

In [6]:
ratings = sc.textFile("./ml-latest-small/ratings.csv")

In [7]:
ratings.cache()

./ml-latest-small/ratings.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [8]:
%%time
ratings.count()

CPU times: user 12.1 ms, sys: 11.4 ms, total: 23.5 ms
Wall time: 5.49 s


100837

In [9]:
%%time
ratings.count()

CPU times: user 10.5 ms, sys: 2.63 ms, total: 13.1 ms
Wall time: 946 ms


100837

In [10]:
%%time
ratings.count()

CPU times: user 6.41 ms, sys: 6.25 ms, total: 12.7 ms
Wall time: 1.29 s


100837

### 4.1 Find movies which have the highest average ratings over the years and identify the corresponding genre.

- Find the average ratings of all movies over the years
- Identify the corresponding genres for each movie

In [11]:
ratings.take(5)

['userId,movieId,rating,timestamp',
 '1,1,4.0,964982703',
 '1,3,4.0,964981247',
 '1,6,4.0,964982224',
 '1,47,5.0,964983815']

In [12]:
ratingHeader = ratings.first() #extract header
print(ratingHeader)

userId,movieId,rating,timestamp


In [13]:
ratingsOnly = ratings.filter(lambda x: x != ratingHeader)

In [14]:
ratingsOnly.take(5)

['1,1,4.0,964982703',
 '1,3,4.0,964981247',
 '1,6,4.0,964982224',
 '1,47,5.0,964983815',
 '1,50,5.0,964982931']

In [15]:
movieRatings = ratingsOnly.map(lambda line: (line.split(",")[1], float(line.split(",")[2])))

In [16]:
movieRatings.take(5)

[('1', 4.0), ('3', 4.0), ('6', 4.0), ('47', 5.0), ('50', 5.0)]

**Possible approaches in aggregating data:** 
- groupByKey and mapValues
- reduceByKey and countByKey

**groupByKey and mapValues**

In [17]:
groupByKeyRatings = movieRatings.groupByKey()
groupByKeyRatings.take(5)

[('7451', <pyspark.resultiterable.ResultIterable at 0x7fd3bf504610>),
 ('8368', <pyspark.resultiterable.ResultIterable at 0x7fd3bf504b10>),
 ('8528', <pyspark.resultiterable.ResultIterable at 0x7fd3bf504150>),
 ('8636', <pyspark.resultiterable.ResultIterable at 0x7fd3bf5048d0>),
 ('8665', <pyspark.resultiterable.ResultIterable at 0x7fd3d41a27d0>)]

In [18]:
mapValuesToListRatings = groupByKeyRatings.mapValues(list)
mapValuesToListRatings.take(5)

[('7451',
  [3.5,
   3.5,
   3.5,
   4.5,
   4.5,
   4.0,
   3.0,
   3.5,
   4.5,
   3.0,
   3.5,
   4.0,
   4.5,
   4.5,
   2.5,
   3.5,
   5.0,
   3.0,
   3.5,
   3.5,
   4.0,
   3.5,
   4.5,
   4.0,
   3.5,
   3.0,
   2.5,
   3.5,
   4.0,
   4.5,
   5.0,
   4.0,
   4.5,
   3.0,
   4.0,
   2.5,
   5.0,
   3.0,
   3.5]),
 ('8368',
  [4.0,
   2.5,
   4.5,
   3.0,
   4.5,
   3.0,
   4.0,
   4.5,
   5.0,
   3.5,
   5.0,
   4.5,
   4.5,
   4.5,
   4.5,
   5.0,
   4.5,
   4.5,
   5.0,
   5.0,
   5.0,
   4.0,
   3.0,
   3.5,
   4.0,
   4.5,
   4.5,
   3.5,
   4.5,
   4.0,
   4.0,
   3.0,
   4.5,
   4.5,
   5.0,
   4.0,
   3.5,
   4.0,
   4.0,
   4.0,
   4.0,
   4.5,
   2.5,
   3.5,
   4.0,
   3.0,
   4.5,
   3.5,
   5.0,
   4.5,
   2.0,
   4.0,
   4.0,
   5.0,
   4.0,
   4.0,
   3.0,
   4.0,
   3.0,
   3.0,
   4.0,
   4.0,
   4.0,
   4.5,
   3.5,
   5.0,
   4.0,
   3.5,
   4.0,
   2.0,
   2.5,
   4.5,
   3.5,
   4.0,
   2.5,
   3.5,
   4.0,
   3.5,
   4.0,
   4.0,
   3.0,
   4.0,
   4.0,
  

In [19]:
avgRatings01 = mapValuesToListRatings.mapValues(lambda V: sum(V) / float(len(V)))
avgRatings01.take(5)

[('7451', 3.7564102564102564),
 ('8368', 3.913978494623656),
 ('8528', 3.551282051282051),
 ('8636', 3.8037974683544302),
 ('8665', 3.7866666666666666)]

Is this correct?

In [20]:
test = [4.5, 3.5, 3.5, 4.0, 3.0, 3.5]
sum(test) / len(test)

3.6666666666666665

**reduceByKey and countByKey**

In [21]:
countsByKey = movieRatings.countByKey()
countsByKey

defaultdict(int,
            {'1': 215,
             '3': 52,
             '6': 102,
             '47': 203,
             '50': 204,
             '70': 55,
             '101': 23,
             '110': 237,
             '151': 44,
             '157': 11,
             '163': 66,
             '216': 49,
             '223': 104,
             '231': 133,
             '235': 70,
             '260': 251,
             '296': 307,
             '316': 140,
             '333': 50,
             '349': 110,
             '356': 329,
             '362': 34,
             '367': 157,
             '423': 10,
             '441': 42,
             '457': 190,
             '480': 238,
             '500': 144,
             '527': 220,
             '543': 41,
             '552': 61,
             '553': 65,
             '590': 164,
             '592': 189,
             '593': 279,
             '596': 60,
             '608': 181,
             '648': 162,
             '661': 49,
             '673': 53,
          

In [22]:
def sumValues(x,y):
    return (x + y)

sumRatings = movieRatings.reduceByKey(sumValues)
sumRatings.take(5)

[('7451', 146.5),
 ('8368', 364.0),
 ('8528', 138.5),
 ('8636', 300.5),
 ('8665', 284.0)]

In [23]:
import operator

sumRatings = movieRatings.reduceByKey(operator.add)
sumRatings.take(5)

[('7451', 146.5),
 ('8368', 364.0),
 ('8528', 138.5),
 ('8636', 300.5),
 ('8665', 284.0)]

In [24]:
avgRatings02 = sumRatings.map(lambda x: (x[0], x[1] / countsByKey.get(x[0])))
avgRatings02.take(5)

[('7451', 3.7564102564102564),
 ('8368', 3.913978494623656),
 ('8528', 3.551282051282051),
 ('8636', 3.8037974683544302),
 ('8665', 3.7866666666666666)]

How do we augment movie ratings data with title information?

In [25]:
movies = sc.textFile("./ml-latest-small/movies.csv")

In [26]:
movieHeader = movies.first() #extract header
print(movieHeader)

movieId,title,genres


In [27]:
movies = movies.filter(lambda x: x != movieHeader)
movies.take(5)

['1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy',
 '2,Jumanji (1995),Adventure|Children|Fantasy',
 '3,Grumpier Old Men (1995),Comedy|Romance',
 '4,Waiting to Exhale (1995),Comedy|Drama|Romance',
 '5,Father of the Bride Part II (1995),Comedy']

**NOTE:** This is not a good way to handle CSV parsing since some strings may contain commas. Instead Consider using a CSV library supported by Spark. We will show this in the next notebook in the series. The following will work for now but is difficult to understand.

In [28]:
movieInfo = movies.map(lambda line: (line.split(",")[0], ((line.rsplit(",",1)[0]).split(",",1)[1], line.rsplit(",",1)[1])))
movieInfo.take(5)

[('1', ('Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy')),
 ('2', ('Jumanji (1995)', 'Adventure|Children|Fantasy')),
 ('3', ('Grumpier Old Men (1995)', 'Comedy|Romance')),
 ('4', ('Waiting to Exhale (1995)', 'Comedy|Drama|Romance')),
 ('5', ('Father of the Bride Part II (1995)', 'Comedy'))]

In [29]:
augmentedRatings = avgRatings01.join(movieInfo)
augmentedRatings.take(5)

[('4',
  (2.357142857142857, ('Waiting to Exhale (1995)', 'Comedy|Drama|Romance'))),
 ('10',
  (3.496212121212121, ('GoldenEye (1995)', 'Action|Adventure|Thriller'))),
 ('12',
  (2.4210526315789473,
   ('Dracula: Dead and Loving It (1995)', 'Comedy|Horror'))),
 ('16', (3.926829268292683, ('Casino (1995)', 'Crime|Drama'))),
 ('20', (2.5, ('Money Train (1995)', 'Action|Comedy|Crime|Drama|Thriller')))]

*Movie with highest average rating:*

In [30]:
augmentedRatings.takeOrdered(10, key = lambda x : -x[1][0])

[('53', (5.0, ('Lamerica (1994)', 'Adventure|Drama'))),
 ('496',
  (5.0, ('What Happened Was... (1994)', 'Comedy|Drama|Romance|Thriller'))),
 ('633', (5.0, ('Denise Calls Up (1995)', 'Comedy'))),
 ('1151', (5.0, ('Lesson Faust (1994)', 'Animation|Comedy|Drama|Fantasy'))),
 ('3073', (5.0, ('"Sandpiper, The (1965)"', 'Drama|Romance'))),
 ('3096', (5.0, ('My Man Godfrey (1957)', 'Comedy'))),
 ('3303',
  (5.0,
   ('Black Tar Heroin: The Dark End of the Street (2000)', 'Documentary'))),
 ('3939', (5.0, ('Slumber Party Massacre II (1987)', 'Horror'))),
 ('4788',
  (5.0,
   ('Moscow Does Not Believe in Tears (Moskva slezam ne verit) (1979)',
    'Drama|Romance'))),
 ('5416', (5.0, ('Cherish (2002)', 'Comedy|Drama|Thriller')))]

*Movie with lowest average rating:*

In [31]:
augmentedRatings.takeOrdered(10, key = lambda x : x[1][0])

[('107013', (0.5, ('"Christmas Carol, A (1977)"', 'Drama|Fantasy'))),
 ('160872', (0.5, ('Satanic (2016)', 'Horror'))),
 ('6371', (0.5, ('Pokémon Heroes (2003)', 'Animation|Children'))),
 ('7312', (0.5, ('"Follow Me, Boys! (1966)"', 'Comedy|Drama'))),
 ('184641',
  (0.5, ('Fullmetal Alchemist 2018 (2017)', 'Action|Adventure|Fantasy'))),
 ('83601', (0.5, ('Amer (2009)', 'Drama|Horror'))),
 ('31422', (0.5, ('Are We There Yet? (2005)', 'Children|Comedy'))),
 ('31424',
  (0.5, ('Alone in the Dark (2005)', 'Action|Horror|Sci-Fi|Thriller'))),
 ('89386', (0.5, ('Pearl Jam Twenty (2011)', 'Documentary|Musical'))),
 ('72696', (0.5, ('Old Dogs (2009)', 'Comedy')))]

### Challenge:

Make appropriate changes so that only movies with average ratings higher than 3.75 and number of ratings totalling at least 1000 are collected.

### 4.2 Find genres which have the highest average ratings over the years

- Identify the genres associated with a movie and its rating
- Each movie can have multiple genres. How to flip the Key/Value pair?

In [32]:
movieRatings.take(5)

[('1', 4.0), ('3', 4.0), ('6', 4.0), ('47', 5.0), ('50', 5.0)]

In [33]:
movieInfo.take(5)

[('1', ('Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy')),
 ('2', ('Jumanji (1995)', 'Adventure|Children|Fantasy')),
 ('3', ('Grumpier Old Men (1995)', 'Comedy|Romance')),
 ('4', ('Waiting to Exhale (1995)', 'Comedy|Drama|Romance')),
 ('5', ('Father of the Bride Part II (1995)', 'Comedy'))]

In [34]:
augmentedInfo = movieRatings.join(movieInfo)

In [35]:
augmentedInfo.take(5)

[('50', (5.0, ('"Usual Suspects, The (1995)"', 'Crime|Mystery|Thriller'))),
 ('50', (4.0, ('"Usual Suspects, The (1995)"', 'Crime|Mystery|Thriller'))),
 ('50', (1.0, ('"Usual Suspects, The (1995)"', 'Crime|Mystery|Thriller'))),
 ('50', (4.5, ('"Usual Suspects, The (1995)"', 'Crime|Mystery|Thriller'))),
 ('50', (5.0, ('"Usual Suspects, The (1995)"', 'Crime|Mystery|Thriller')))]

In [36]:
def extractGenreRating (t):
    final_tuples = []
    genreList = t[1][1][1].split("|")
    for genre in genreList:
        final_tuples.append((genre,t[1][0]))
    return final_tuples
print(extractGenreRating((u'1', (3.0, (u'Toy Story (1995)', u'Adventure|Animation|Children|Comedy|Fantasy')))))

[('Adventure', 3.0), ('Animation', 3.0), ('Children', 3.0), ('Comedy', 3.0), ('Fantasy', 3.0)]


In [37]:
genreRatings = augmentedInfo.flatMap(extractGenreRating)

In [38]:
countsByKey = genreRatings.countByKey()
countsByKey

defaultdict(int,
            {'Adventure': 24161,
             'Fantasy': 11834,
             'IMAX': 4145,
             'Comedy': 39053,
             'Action': 30635,
             'Sci-Fi': 17243,
             'Romance': 18124,
             'Animation': 6988,
             'Children': 9208,
             'Drama': 41928,
             'Thriller': 26452,
             'Crime': 16681,
             'War': 4859,
             'Mystery': 7674,
             'Western': 1930,
             'Musical': 4138,
             'Horror': 7291,
             'Film-Noir': 870,
             'Documentary': 1219,
             '(no genres listed)': 47})

### Challenge:

Complete the remainder of the steps to find the average rating of each genre. 

In [39]:
sc.stop()