# <center> Introduction to Spark In-memmory Computing via Python PySpark </center>

In [5]:
!module list

Currently Loaded Modulefiles:
  1) anaconda3/4.2.0   2) matlab/2015a      3) zeromq/4.1.5


In [1]:
import sys
import os

sys.path.insert(0, '/usr/hdp/2.6.0.3-8/spark2/python')
sys.path.insert(0, '/usr/hdp/2.6.0.3-8/spark2/python/lib/py4j-0.10.4-src.zip')

os.environ['SPARK_HOME'] = '/usr/hdp/2.6.0.3-8/spark2/'
os.environ['SPARK_CONF_DIR'] = '/etc/hadoop/synced_conf/spark2/'
os.environ['PYSPARK_PYTHON'] = '/software/anaconda3/4.2.0/bin/python'

import pyspark
conf = pyspark.SparkConf()
conf.setMaster("yarn")
conf.set("spark.driver.memory","4g")
conf.set("spark.executor.memory","60g")
conf.set("spark.num.executors","3")
conf.set("spark.executor.cores","12")

sc = pyspark.SparkContext(conf=conf)

In [2]:
sc

<pyspark.context.SparkContext at 0x2b8643fc8dd8>

### Movie Ratings

An independent movie company is looking to invest in a new movie project. With limited finance, the company wants to 
analyze the reaction of audiences, particularly toward various movie genres, in order to identify beneficial 
movie project to focus on. The company relies on data collected from a publicly available recommendation service 
by [MovieLens](http://dl.acm.org/citation.cfm?id=2827872). This 
[dataset](http://files.grouplens.org/datasets/movielens/ml-10m-README.html) contains **24404096** ratings and **668953**
 tag applications across **40110** movies. These data were created by **247753** users between January 09, 1995 and January 29, 2016. This dataset was generated on October 17, 2016. 

From this dataset, several analyses are possible, include the followings:
1.   Find movies which have the highest average ratings over the years and identify the corresponding genre.
2.   Find genres which have the highest average ratings over the years.
3.   Find users who rate movies most frequently in order to contact them for in-depth marketing analysis.

These types of analyses, which are somewhat ambiguous, demand the ability to quickly process large amount of data in 
elatively short amount of time for decision support purposes. In these situations, the sizes of the data typically 
make analysis done on a single machine impossible and analysis done using a remote storage system impractical. For 
remainder of the lessons, we will learn how HDFS provides the basis to store massive amount of data and to enable 
the programming approach to analyze these data.

In [1]:
!hdfs dfs -ls /repository/movielens

Found 7 items
-rw-r--r--   2 lngo hdfs-user       9511 2017-03-15 09:49 /repository/movielens/README.txt
-rw-r--r--   2 lngo hdfs-user  333365341 2017-03-15 09:49 /repository/movielens/genome-scores.csv
-rw-r--r--   2 lngo hdfs-user      18103 2017-03-15 09:49 /repository/movielens/genome-tags.csv
-rw-r--r--   2 lngo hdfs-user     859311 2017-03-15 09:49 /repository/movielens/links.csv
-rw-r--r--   2 lngo hdfs-user    2007982 2017-03-15 09:49 /repository/movielens/movies.csv
-rw-r--r--   2 lngo hdfs-user  663420664 2017-03-15 09:49 /repository/movielens/ratings.csv
-rw-r--r--   2 lngo hdfs-user   24032991 2017-03-15 09:49 /repository/movielens/tags.csv


In [30]:
!hdfs dfs -cat  /repository/movielens/README.txt

Summary

This dataset (ml-latest) describes 5-star rating and free-text tagging activity from [MovieLens](http://movielens.org), a movie recommendation service. It contains 24404096 ratings and 668953 tag applications across 40110 movies. These data were created by 259137 users between January 09, 1995 and October 17, 2016. This dataset was generated on October 18, 2016.

Users were selected at random for inclusion. All selected users had rated at least 1 movies. No demographic information is included. Each user is represented by an id, and no other information is provided.

The data are contained in the files `genome-scores.csv`, `genome-tags.csv`, `links.csv`, `movies.csv`, `ratings.csv` and `tags.csv`. More details about the contents and use of all these files follows.

This is a *development* dataset. As such, it may change over time and is not an appropriate dataset for shared research results. See available *benchmark* datasets if that is your intent.

This and other Gr

In [31]:
!hdfs dfs -cat  /repository/movielens/links.csv \
    2>/dev/null | head -n 5

movieId,imdbId,tmdbId
1,0114709,862
2,0113497,8844
3,0113228,15602
4,0114885,31357


In [32]:
!hdfs dfs -cat  /repository/movielens/movies.csv \
    2>/dev/null | head -n 5

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance


In [33]:
!hdfs dfs -cat  /repository/movielens/ratings.csv \
    2>/dev/null | head -n 5

userId,movieId,rating,timestamp
1,122,2.0,945544824
1,172,1.0,945544871
1,1221,5.0,945544788
1,1441,4.0,945544871


In [34]:
!hdfs dfs -cat  /repository/movielens/tags.csv \
    2>/dev/null | head -n 5

userId,movieId,tag,timestamp
28,63062,angelina jolie,1263047558
40,4973,Poetic,1436439070
40,117533,privacy,1436439140
57,356,life positive,1291771526


In [38]:
ratings = sc.textFile("/repository/movielens/ratings.csv")

In [39]:
ratings.cache()

/repository/movielens/ratings.csv MapPartitionsRDD[33] at textFile at NativeMethodAccessorImpl.java:0

In [40]:
%%time
ratings.count()

CPU times: user 10.1 ms, sys: 4.38 ms, total: 14.5 ms
Wall time: 13.2 s


24404097

In [43]:
%%time
ratings.count()

CPU times: user 9 ms, sys: 4 ms, total: 13 ms
Wall time: 2.99 s


24404097

In [55]:
%%time
ratings.count()

CPU times: user 9 ms, sys: 3 ms, total: 12 ms
Wall time: 3.05 s


24404097

### 4.1 Find movies which have the highest average ratings over the years and identify the corresponding genre

- Find the average ratings of all movies over the years
- Identify the corresponding genres for each movie

In [45]:
ratings.take(5)

['userId,movieId,rating,timestamp',
 '1,122,2.0,945544824',
 '1,172,1.0,945544871',
 '1,1221,5.0,945544788',
 '1,1441,4.0,945544871']

In [46]:
ratingHeader = ratings.first() #extract header
print(ratingHeader)

userId,movieId,rating,timestamp


In [47]:
ratingsOnly = ratings.filter(lambda x:x != ratingHeader)

In [48]:
ratingsOnly.take(5)

['1,122,2.0,945544824',
 '1,172,1.0,945544871',
 '1,1221,5.0,945544788',
 '1,1441,4.0,945544871',
 '1,1609,3.0,945544824']

In [49]:
movieRatings = ratingsOnly.map(lambda line: (line.split(",")[1], float(line.split(",")[2])))

In [50]:
movieRatings.take(5)

[('122', 2.0), ('172', 1.0), ('1221', 5.0), ('1441', 4.0), ('1609', 3.0)]

**Possible approaches in aggregating data:** 
- groupByKey and mapValues
- reduceByKey and countByKey

**groupByKey and mapValues**

In [51]:
groupByKeyRatings = movieRatings.groupByKey()

groupByKeyRatings.take(5)

[('154214', <pyspark.resultiterable.ResultIterable at 0x2ba1e24ad588>),
 ('27479', <pyspark.resultiterable.ResultIterable at 0x2ba1e24ad710>),
 ('129667', <pyspark.resultiterable.ResultIterable at 0x2ba1e24adf28>),
 ('140054', <pyspark.resultiterable.ResultIterable at 0x2ba1e24ad780>),
 ('45183', <pyspark.resultiterable.ResultIterable at 0x2ba1e24ade10>)]

In [52]:
mapValuesToListRatings = groupByKeyRatings.mapValues(list)
mapValuesToListRatings.take(5)

[('154214', [3.5, 0.5, 2.0]),
 ('27479', [4.0, 1.5, 1.0]),
 ('129667', [5.0, 5.0, 2.0, 3.0]),
 ('140054', [3.5, 2.0, 4.0]),
 ('45183',
  [3.0,
   1.5,
   3.5,
   3.0,
   3.0,
   2.0,
   3.5,
   2.0,
   1.5,
   3.5,
   4.0,
   4.0,
   3.5,
   5.0,
   3.5,
   4.0,
   0.5,
   2.0,
   4.5,
   5.0,
   2.0,
   4.0,
   4.5,
   4.0,
   5.0,
   3.5,
   2.0,
   3.5,
   4.5,
   3.5,
   0.5,
   5.0,
   4.5,
   3.0,
   3.5,
   3.5,
   4.5,
   3.5,
   4.5,
   4.5,
   4.5,
   3.5,
   5.0,
   0.5,
   4.0,
   4.0,
   2.5,
   2.5,
   2.0,
   3.0,
   4.5,
   3.5,
   2.5,
   4.0,
   3.5,
   4.0,
   3.0,
   2.5,
   4.0,
   4.5,
   4.5,
   4.5,
   4.5,
   4.5,
   4.0,
   3.5,
   3.0,
   2.5,
   3.5,
   4.5,
   2.5,
   5.0,
   3.5,
   3.5,
   3.0,
   2.5,
   3.0,
   4.5,
   2.0,
   4.0,
   3.5,
   3.5,
   3.0,
   3.5,
   3.5,
   5.0,
   4.5,
   3.0,
   3.5,
   3.0,
   3.0,
   3.5,
   5.0,
   5.0,
   4.5,
   3.5,
   4.0,
   2.0,
   4.5,
   3.0,
   5.0,
   4.0,
   3.0,
   4.5,
   2.5,
   4.0,
   2.0,
   4.0,
 

In [53]:
avgRatings01 = mapValuesToListRatings.mapValues(lambda V: sum(V) / float(len(V)))

avgRatings01.take(5)

[('154214', 2.0),
 ('27479', 2.1666666666666665),
 ('129667', 3.75),
 ('140054', 3.1666666666666665),
 ('45183', 3.5485781990521326)]

Is this correct?

In [None]:
(3.5 + 3.5 + 2.5 + 3.5 + 2.0 + 3.5 + 2.5 + 3.0) / 8

**reduceByKey and countByKey**

In [None]:
countsByKey = movieRatings.countByKey()

countsByKey

In [None]:
def sumValues(x,y):
    return (x + y)

sumRatings = movieRatings.reduceByKey(sumValues)

sumRatings.take(5)

In [None]:
import operator

sumRatings = movieRatings.reduceByKey(operator.add)
sumRatings.take(5)

In [None]:
avgRatings02 = sumRatings.map(lambda x: (x[0], x[1] / countsByKey.get(x[0])))

avgRatings02.take(5)

How do we augment movie ratings data with title informations?

In [54]:
movies = sc.textFile("movielens/movies.csv")

In [57]:
movieHeader = movies.first() #extract header
print(movieHeader)

movieId,title,genres


In [58]:
movies = movies.filter(lambda x:x != movieHeader)

movies.take(5)

['1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy',
 '2,Jumanji (1995),Adventure|Children|Fantasy',
 '3,Grumpier Old Men (1995),Comedy|Romance',
 '4,Waiting to Exhale (1995),Comedy|Drama|Romance',
 '5,Father of the Bride Part II (1995),Comedy']

In [59]:
movieInfo = movies.map(lambda line: (line.split(",")[0], (line.split(",")[1], line.split(",")[2])))

movieInfo.take(5)

[('1', ('Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy')),
 ('2', ('Jumanji (1995)', 'Adventure|Children|Fantasy')),
 ('3', ('Grumpier Old Men (1995)', 'Comedy|Romance')),
 ('4', ('Waiting to Exhale (1995)', 'Comedy|Drama|Romance')),
 ('5', ('Father of the Bride Part II (1995)', 'Comedy'))]

In [60]:
augmentedRatings = avgRatings01.join(movieInfo)

augmentedRatings.take(5)

[('1440', (2.776470588235294, ('Amos & Andrew (1993)', 'Comedy'))),
 ('106450', (4.25, ('Chicago Overcoat (2009)', 'Action|Drama'))),
 ('564', (2.3294797687861273, ('Chasers (1994)', 'Comedy'))),
 ('108318', (2.8088235294117645, ('"Single Shot', ' A (2013)"'))),
 ('150421', (3.0, ('Man on Horseback (1969)', '(no genres listed)')))]

*Movie with highest average rating:*

In [61]:
augmentedRatings.takeOrdered(10, key = lambda x : -x[1][0])

[('120436', (5.0, ('Garbo Talks (1984)', 'Comedy|Drama'))),
 ('136874', (5.0, ('Natarang (2010)', '(no genres listed)'))),
 ('146946', (5.0, ('The Hardy Bucks Movie (2013)', 'Comedy'))),
 ('114353', (5.0, ('Heavyweights (Schwere Jungs) (2006)', 'Comedy'))),
 ('123727', (5.0, ('Immigration Tango (2011)', 'Comedy|Romance'))),
 ('164869', (5.0, ('A Cinderella Story: If the Shoe Fits (2016)', 'Comedy'))),
 ('159423',
  (5.0,
   ('Jonas Brothers: The Concert Experience (2009)', '(no genres listed)'))),
 ('133575', (5.0, ('Do Detectives Think? (1927)', 'Comedy'))),
 ('93967',
  (5.0, ('"Keeping the Promise (Sign of the Beaver', ' The) (1997)"'))),
 ('160325', (5.0, ('I Love Hong Kong (2011)', 'Comedy')))]

*Movie with lowest average rating:*

In [62]:
augmentedRatings.takeOrdered(10, key = lambda x : x[1][0])

[('164927', (0.5, ('Where Souls Go (2007)', '(no genres listed)'))),
 ('156840', (0.5, ('Jurassic Attack (2012)', 'Action|Sci-Fi'))),
 ('138008', (0.5, ('New Year (2011)', '(no genres listed)'))),
 ('131152', (0.5, ('The Fat Spy (1966)', 'Comedy'))),
 ('109355', (0.5, ('13 Fighting Men (1960)', 'Western'))),
 ('127327', (0.5, ('Khan Kluay (2006)', 'Adventure|Animation|Children'))),
 ('160978', (0.5, ('Hellevator (2004)', 'Horror|Sci-Fi'))),
 ('145285', (0.5, ('Octopus (2000)', 'Action|Horror|Thriller'))),
 ('133541', (0.5, ('Two Hundred Thousand Dirty (2014)', 'Comedy'))),
 ('139717', (0.5, ('10 Cent Pistol (2015)', 'Crime|Thriller')))]

### Challenge

- Augment the mapping process of WordCount with a function to filter out punctuations and capitalization from the unique words

### Challenge:

1. Make appropriate changes so that only movies with averaged ratings higher than 3.75 are collected
2. Further enhance your modification so that only movies with averaged ratings higher than 3.75 and number of ratings of at least 1000 times are collected.

### 4.2 Find genres which have the highest average ratings over the years

- Identify the genres associated with a movie and its rating
- Each movie can have multiple genres. How to flip the Key/Value pair?

In [63]:
movieRatings.take(5)

[('122', 2.0), ('172', 1.0), ('1221', 5.0), ('1441', 4.0), ('1609', 3.0)]

In [64]:
movieInfo.take(5)

[('1', ('Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy')),
 ('2', ('Jumanji (1995)', 'Adventure|Children|Fantasy')),
 ('3', ('Grumpier Old Men (1995)', 'Comedy|Romance')),
 ('4', ('Waiting to Exhale (1995)', 'Comedy|Drama|Romance')),
 ('5', ('Father of the Bride Part II (1995)', 'Comedy'))]

In [65]:
augmentedInfo = movieRatings.join(movieInfo)

In [66]:
augmentedInfo.take(5)

[('1440', (3.0, ('Amos & Andrew (1993)', 'Comedy'))),
 ('1440', (2.0, ('Amos & Andrew (1993)', 'Comedy'))),
 ('1440', (3.0, ('Amos & Andrew (1993)', 'Comedy'))),
 ('1440', (5.0, ('Amos & Andrew (1993)', 'Comedy'))),
 ('1440', (3.0, ('Amos & Andrew (1993)', 'Comedy')))]

In [67]:
def extractGenreRating (t):
    final_tuples = []
    genreList = t[1][1][1].split("|")
    for genre in genreList:
        final_tuples.append((genre,t[1][0]))
    return final_tuples

print(extractGenreRating((u'1', (3.0, (u'Toy Story (1995)', u'Adventure|Animation|Children|Comedy|Fantasy')))))

[('Adventure', 3.0), ('Animation', 3.0), ('Children', 3.0), ('Comedy', 3.0), ('Fantasy', 3.0)]


In [68]:
genreRatings = augmentedInfo.flatMap(extractGenreRating)

In [69]:
genreRatings.take(5)

[('Comedy', 3.0),
 ('Comedy', 2.0),
 ('Comedy', 3.0),
 ('Comedy', 5.0),
 ('Comedy', 3.0)]

### Challenge:

Complete the remaining portion of task 2.2: Calculating the average rating of each genre over the years

### 4.3 Find users who rate movies most frequently in order to contact them for in-depth marketing analysis

- How do you define "frequently"?
    - At least once per week?

In [70]:
userRatings = ratingsOnly.map(lambda line: (line.split(",")[0], float(line.split(",")[3])))

In [71]:
ratingGroupByUsers = userRatings.groupByKey().mapValues(list)
ratingGroupByUsers.take(5)

[('51313',
  [847620176.0,
   847620106.0,
   847620374.0,
   847620326.0,
   847620411.0,
   847620264.0,
   847620347.0,
   847619964.0,
   847619965.0,
   847620326.0,
   847620233.0,
   847620294.0,
   847620021.0,
   847620211.0,
   847620021.0,
   847620326.0,
   847619878.0,
   847620211.0,
   847620264.0,
   847620021.0,
   847620374.0,
   847620470.0,
   847619923.0,
   847620049.0,
   847620049.0,
   847620347.0,
   847620411.0,
   847620049.0,
   847620072.0,
   847620176.0,
   847620211.0,
   847620690.0,
   847619900.0,
   847620748.0,
   847620748.0,
   847620049.0,
   847620233.0,
   847620106.0,
   847620656.0,
   847620072.0,
   847619900.0,
   847619878.0]),
 ('23161',
  [945058557.0,
   945063385.0,
   945049906.0,
   945063316.0,
   945049950.0,
   945057901.0,
   945057196.0,
   945057745.0,
   945057745.0,
   945058411.0,
   945050030.0,
   945049753.0,
   945063619.0,
   945061581.0,
   945058080.0,
   945057505.0,
   945057506.0,
   945057314.0,
   945061786.0,


In [72]:
avgRatingFreq = ratingGroupByUsers.mapValues(lambda V: (max(V) - min(V)) / float(len(V)))
avgRatingFreq.take(5)

[('67056', 37.26),
 ('77986', 3230.665418227216),
 ('226527', 8.6),
 ('207119', 12.28),
 ('53196', 7.011363636363637)]

In [None]:
x = [1346139060.0,
   1346139098.0,
   1346139113.0,
   1346139053.0,
   1346139234.0,
   1346139006.0,
   1346139209.0,
   1346139147.0,
   1346138998.0,
   1346139206.0,
   1346139224.0,
   1346139174.0,
   1346139152.0,
   1346139230.0,
   1346139181.0,
   1346139159.0,
   1346139314.0]
(max(x) - min(x)) / float(len(x))

In [73]:
topUsers = avgRatingFreq.top(10, key=lambda x: x[1])

In [74]:
topUsers

[('40407', 51121853.75),
 ('241087', 40917744.72727273),
 ('54838', 36601016.0),
 ('248290', 33999095.666666664),
 ('39601', 33138222.0),
 ('155302', 33013341.666666668),
 ('117995', 29406107.25),
 ('183383', 29210786.666666668),
 ('121552', 26685917.875),
 ('74936', 26309319.0)]

In [None]:
sc.stop()