# <center> Introduction to Spark In-memory Computing via Python PySpark </center>

In [1]:
!bash launch_spark_cluster.sh

Launching Spark cluster with the following parameters:
Master Node: node0314.palmetto.clemson.edu
Slave Nodes:
node0317.palmetto.clemson.edu
node0355.palmetto.clemson.edu
node0442.palmetto.clemson.edu
Temporary dir: /local_scratch/pbs.8742780.pbs02
Memory per worker (GB): 13G
Cores per worker: 8
Num workers: 3
. /home/aamle/software/spark-2.4.5-bin-hadoop2.7/sbin/start-all.sh -h node0314.palmetto.clemson.edu -d /local_scratch/pbs.8742780.pbs02 -m 13G -c 3
starting org.apache.spark.deploy.master.Master, logging to /home/aamle/software/spark-2.4.5-bin-hadoop2.7/logs/spark-aamle-org.apache.spark.deploy.master.Master-1-node0314.palmetto.clemson.edu.out
node0355.palmetto.clemson.edu: starting org.apache.spark.deploy.worker.Worker, logging to /home/aamle/software/spark-2.4.5-bin-hadoop2.7/logs/spark-aamle-org.apache.spark.deploy.worker.Worker-1-node0355.palmetto.clemson.edu.out
node0317.palmetto.clemson.edu: starting org.apache.spark.deploy.worker.Worker, logging to /home/aamle/software/spar

In [2]:
import sys
import os
import pyspark

env_spark_home=os.path.join(os.environ['HOME'],"software","spark-2.4.5-bin-hadoop2.7")
env_spark_conf_dir=os.path.join(env_spark_home,"conf")
env_pyspark_python=os.path.join("/software","anaconda3","5.1.0","bin","python")

os.environ['SPARK_HOME'] = env_spark_home
os.environ['SPARK_CONF_DIR'] = env_spark_conf_dir
os.environ['PYSPARK_PYTHON'] = env_pyspark_python

fp = open(os.path.join(env_spark_conf_dir,"master"))
node_list = fp.readlines()

import pyspark
conf = pyspark.SparkConf()
conf.setMaster("spark://" + node_list[0].strip() + ":7077")
conf.setAppName('big-data-workshop')
conf.set("spark.driver.memory","5g")
conf.set("spark.executor.instances", "3")
conf.set("spark.executor.memory","13g")
conf.set("spark.executor.cores","8")

sc = pyspark.SparkContext(conf=conf)

print(sc)

<SparkContext master=spark://node0314.palmetto.clemson.edu:7077 appName=big-data-workshop>


### Movie Ratings

An independent movie company is looking to invest in a new movie project. With limited finances, the company wants to 
analyze the reactions of audiences, particularly toward various movie genres, in order to identify a 
movie project to focus on which will help the business earn more profit. The company relies on data collected from a publicly available recommendation service by [MovieLens](http://dl.acm.org/citation.cfm?id=2827872). This [dataset](http://files.grouplens.org/datasets/movielens/ml-10m-README.html) contains **24,404,096** ratings and **668,953** tags applied across **40,110** movies. This data was created by **247,753** users between January 09, 1995 and January 29, 2016. This dataset was generated on October 17, 2016. 

From this dataset, several analyses are possible, include the following:
1.   Find movies which have the highest average ratings over the years and identify the corresponding genre.
2.   Find genres which have the highest average ratings over the years.
3.   Find users who rate movies most frequently in order to contact them for an in-depth marketing analysis.

These types of analyses, which are somewhat ambiguous, demand the ability to quickly process large amounts of data in 
a relatively short amount of time for justifying business decisions. In these situations, the size of the data typically makes analysis done on a single machine impossible and analysis done using a remote storage system impractical. For the remainder of the lessons, we will learn how HDFS provides the basis to store a massive amount of data and to enable the programming approach to analyze this data.

In [3]:
!ls -lh /zfs/citi/movielens

total 406M
-rw-r--r-- 1 lngo cuuser 318M May 11 14:23 genome-scores.csv
-rw-r--r-- 1 lngo cuuser  18K May 11 14:23 genome-tags.csv
-rw-r--r-- 1 lngo cuuser 840K May 11 14:23 links.csv
-rw-r--r-- 1 lngo cuuser 2.0M May 11 14:23 movies.csv
-rw-r--r-- 1 lngo cuuser 633M May 11 14:24 ratings.csv
-rw-r--r-- 1 lngo cuuser 9.3K May 11 14:23 README.txt
-rw-r--r-- 1 lngo cuuser  23M May 11 14:24 tags.csv


In [4]:
!cat /zfs/citi/movielens/README.txt

Summary

This dataset (ml-latest) describes 5-star rating and free-text tagging activity from [MovieLens](http://movielens.org), a movie recommendation service. It contains 24404096 ratings and 668953 tag applications across 40110 movies. These data were created by 259137 users between January 09, 1995 and October 17, 2016. This dataset was generated on October 18, 2016.

Users were selected at random for inclusion. All selected users had rated at least 1 movies. No demographic information is included. Each user is represented by an id, and no other information is provided.

The data are contained in the files `genome-scores.csv`, `genome-tags.csv`, `links.csv`, `movies.csv`, `ratings.csv` and `tags.csv`. More details about the contents and use of all these files follows.

This is a *development* dataset. As such, it may change over time and is not an appropriate dataset for shared research results. See available *benchmark* datasets if that is your intent.

This and other Gr

In [5]:
!cat /zfs/citi/movielens/links.csv \
    2>/dev/null | head -n 5

movieId,imdbId,tmdbId
1,0114709,862
2,0113497,8844
3,0113228,15602
4,0114885,31357


In [6]:
!cat /zfs/citi/movielens/movies.csv \
    2>/dev/null | head -n 5

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance


In [7]:
!cat /zfs/citi/movielens/ratings.csv \
    2>/dev/null | head -n 5

userId,movieId,rating,timestamp
1,122,2.0,945544824
1,172,1.0,945544871
1,1221,5.0,945544788
1,1441,4.0,945544871


In [8]:
!cat /zfs/citi/movielens/tags.csv \
    2>/dev/null | head -n 5

userId,movieId,tag,timestamp
28,63062,angelina jolie,1263047558
40,4973,Poetic,1436439070
40,117533,privacy,1436439140
57,356,life positive,1291771526


In [9]:
ratings = sc.textFile("/zfs/citi/movielens/ratings.csv")

In [10]:
ratings.cache()

/zfs/citi/movielens/ratings.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [11]:
%%time
ratings.count()

CPU times: user 13 ms, sys: 7.23 ms, total: 20.2 ms
Wall time: 7.35 s


24404097

In [12]:
%%time
ratings.count()

CPU times: user 8.39 ms, sys: 4.65 ms, total: 13 ms
Wall time: 3.11 s


24404097

In [13]:
%%time
ratings.count()

CPU times: user 8.62 ms, sys: 6.16 ms, total: 14.8 ms
Wall time: 3.1 s


24404097

### 4.1 Find movies which have the highest average ratings over the years and identify the corresponding genre.

- Find the average ratings of all movies over the years
- Identify the corresponding genres for each movie

In [14]:
ratings.take(5)

['userId,movieId,rating,timestamp',
 '1,122,2.0,945544824',
 '1,172,1.0,945544871',
 '1,1221,5.0,945544788',
 '1,1441,4.0,945544871']

In [15]:
ratingHeader = ratings.first() #extract header
print(ratingHeader)

userId,movieId,rating,timestamp


In [16]:
ratingsOnly = ratings.filter(lambda x: x != ratingHeader)

In [17]:
ratingsOnly.take(5)

['1,122,2.0,945544824',
 '1,172,1.0,945544871',
 '1,1221,5.0,945544788',
 '1,1441,4.0,945544871',
 '1,1609,3.0,945544824']

In [18]:
movieRatings = ratingsOnly.map(lambda line: (line.split(",")[1], float(line.split(",")[2])))

In [19]:
movieRatings.take(5)

[('122', 2.0), ('172', 1.0), ('1221', 5.0), ('1441', 4.0), ('1609', 3.0)]

**Possible approaches in aggregating data:** 
- groupByKey and mapValues
- reduceByKey and countByKey

**groupByKey and mapValues**

In [20]:
groupByKeyRatings = movieRatings.groupByKey()

groupByKeyRatings.take(5)

[('356', <pyspark.resultiterable.ResultIterable at 0x14d230d7e358>),
 ('553', <pyspark.resultiterable.ResultIterable at 0x14d230d7ebe0>),
 ('65261', <pyspark.resultiterable.ResultIterable at 0x14d230d7e400>),
 ('3913', <pyspark.resultiterable.ResultIterable at 0x14d230d7e2e8>),
 ('2959', <pyspark.resultiterable.ResultIterable at 0x14d230d7e5c0>)]

In [21]:
mapValuesToListRatings = groupByKeyRatings.mapValues(list)
mapValuesToListRatings.take(5)

[('114082',
  [4.5,
   3.5,
   2.0,
   4.5,
   4.0,
   4.5,
   3.0,
   2.5,
   4.5,
   3.0,
   3.5,
   5.0,
   5.0,
   4.0,
   4.0,
   4.0,
   4.0,
   3.0,
   4.0,
   4.0,
   3.0,
   5.0,
   4.0,
   4.5,
   3.5,
   1.0,
   5.0,
   4.5,
   2.0,
   2.5,
   4.0,
   3.0,
   4.0,
   3.5,
   3.0,
   0.5,
   3.0,
   3.5,
   2.5,
   3.5,
   3.5,
   3.0,
   3.5,
   4.5,
   2.5,
   4.0,
   4.0,
   2.5,
   1.5,
   5.0,
   4.0,
   2.5,
   4.0,
   4.0,
   3.0,
   1.5,
   3.5,
   5.0,
   3.5,
   5.0,
   5.0,
   3.5,
   5.0,
   4.5,
   1.0,
   4.5,
   5.0,
   5.0,
   4.0,
   0.5,
   3.0,
   5.0,
   3.5,
   4.0,
   3.0,
   2.0,
   2.5,
   3.0,
   4.0,
   5.0,
   0.5,
   0.5]),
 ('114552',
  [4.0,
   1.0,
   2.5,
   3.0,
   5.0,
   3.5,
   3.0,
   4.0,
   1.5,
   3.0,
   3.0,
   4.5,
   3.0,
   3.0,
   4.5,
   3.5,
   4.0,
   1.5,
   3.0,
   3.5,
   1.5,
   3.5,
   2.5,
   3.5,
   2.5,
   3.5,
   4.0,
   3.5,
   4.0,
   4.5,
   4.5,
   2.0,
   1.0,
   3.0,
   2.0,
   4.0,
   5.0,
   5.0,
   3.0,
   5.0

In [22]:
avgRatings01 = mapValuesToListRatings.mapValues(lambda V: sum(V) / float(len(V)))

avgRatings01.take(5)

[('114082', 3.4878048780487805),
 ('114552', 3.3661616161616164),
 ('114601', 3.275092936802974),
 ('2959', 4.229890025011034),
 ('50', 4.308635621494996)]

Is this correct?

In [23]:
test = [2.0, 4.0, 3.0]
sum(test) / len(test)

3.0

**reduceByKey and countByKey**

In [24]:
countsByKey = movieRatings.countByKey()

countsByKey

defaultdict(int,
            {'122': 4060,
             '172': 14078,
             '1221': 34508,
             '1441': 6723,
             '1609': 780,
             '1961': 29790,
             '1972': 1534,
             '441': 9386,
             '494': 14299,
             '1193': 37995,
             '1597': 12262,
             '1608': 16097,
             '1641': 16953,
             '2628': 34632,
             '3454': 420,
             '3519': 1087,
             '4963': 28770,
             '1': 63469,
             '5': 15023,
             '11': 19281,
             '21': 25337,
             '32': 51931,
             '34': 34569,
             '61': 2441,
             '79': 6288,
             '107': 7982,
             '140': 5623,
             '150': 56077,
             '158': 15206,
             '161': 25761,
             '165': 39597,
             '185': 26301,
             '225': 13294,
             '236': 13426,
             '237': 6712,
             '252': 10830,
             '256': 11

In [25]:
def sumValues(x,y):
    return (x + y)

sumRatings = movieRatings.reduceByKey(sumValues)

sumRatings.take(5)

[('114082', 286.0),
 ('114552', 1333.0),
 ('114601', 881.0),
 ('2959', 230004.5),
 ('50', 242783.0)]

In [26]:
import operator

sumRatings = movieRatings.reduceByKey(operator.add)
sumRatings.take(5)

[('356', 350597.0),
 ('553', 59427.0),
 ('65261', 9253.0),
 ('3913', 413.0),
 ('2959', 230004.5)]

In [27]:
avgRatings02 = sumRatings.map(lambda x: (x[0], x[1] / countsByKey.get(x[0])))

avgRatings02.take(5)

[('114082', 3.4878048780487805),
 ('114552', 3.3661616161616164),
 ('114601', 3.275092936802974),
 ('2959', 4.229890025011034),
 ('50', 4.308635621494996)]

How do we augment movie ratings data with title information?

In [28]:
movies = sc.textFile("/zfs/citi/movielens/movies.csv")

In [29]:
movieHeader = movies.first() #extract header
print(movieHeader)

movieId,title,genres


In [30]:
movies = movies.filter(lambda x: x != movieHeader)

movies.take(5)

['1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy',
 '2,Jumanji (1995),Adventure|Children|Fantasy',
 '3,Grumpier Old Men (1995),Comedy|Romance',
 '4,Waiting to Exhale (1995),Comedy|Drama|Romance',
 '5,Father of the Bride Part II (1995),Comedy']

**NOTE:** This is not a good way to handle CSV parsing since some strings may contain commas. Instead Consider using a CSV library supported by Spark. We will show this in the next notebook in the series. The following will work for now but is difficult to understand.

In [31]:
movieInfo = movies.map(lambda line: (line.split(",")[0], ((line.rsplit(",",1)[0]).split(",",1)[1], line.rsplit(",",1)[1])))

movieInfo.take(5)

[('1', ('Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy')),
 ('2', ('Jumanji (1995)', 'Adventure|Children|Fantasy')),
 ('3', ('Grumpier Old Men (1995)', 'Comedy|Romance')),
 ('4', ('Waiting to Exhale (1995)', 'Comedy|Drama|Romance')),
 ('5', ('Father of the Bride Part II (1995)', 'Comedy'))]

In [32]:
augmentedRatings = avgRatings01.join(movieInfo)

augmentedRatings.take(5)

[('2959',
  (4.229890025011034, ('Fight Club (1999)', 'Action|Crime|Drama|Thriller'))),
 ('1198',
  (4.15006599835896,
   ('Raiders of the Lost Ark (Indiana Jones and the Raiders of the Lost Ark) (1981)',
    'Action|Adventure'))),
 ('45728', (3.561777777777778, ('Clerks II (2006)', 'Comedy'))),
 ('4679', (3.29002624671916, ('Uncle Buck (1989)', 'Comedy'))),
 ('69524',
  (3.9783174327840416,
   ('Raiders of the Lost Ark: The Adaptation (1989)',
    'Action|Adventure|Thriller')))]

*Movie with highest average rating:*

In [33]:
augmentedRatings.takeOrdered(10, key = lambda x : -x[1][0])

[('159423',
  (5.0,
   ('Jonas Brothers: The Concert Experience (2009)', '(no genres listed)'))),
 ('112674', (5.0, ('Bulletproof Salesman (2008)', 'Documentary'))),
 ('164869', (5.0, ('A Cinderella Story: If the Shoe Fits (2016)', 'Comedy'))),
 ('114353', (5.0, ('Heavyweights (Schwere Jungs) (2006)', 'Comedy'))),
 ('123727', (5.0, ('Immigration Tango (2011)', 'Comedy|Romance'))),
 ('93967',
  (5.0,
   ('"Keeping the Promise (Sign of the Beaver, The) (1997)"',
    'Children|Drama'))),
 ('160325', (5.0, ('I Love Hong Kong (2011)', 'Comedy'))),
 ('145939', (5.0, ('Sandesham (1991)', 'Children|Comedy'))),
 ('146946', (5.0, ('The Hardy Bucks Movie (2013)', 'Comedy'))),
 ('133575', (5.0, ('Do Detectives Think? (1927)', 'Comedy')))]

*Movie with lowest average rating:*

In [34]:
augmentedRatings.takeOrdered(10, key = lambda x : x[1][0])

[('73196',
  (0.5, ('My Love Has Been Burning (Waga koi wa moenu) (1949)', 'Drama'))),
 ('80154', (0.5, ('Urban Menace (1999)', 'Action|Horror'))),
 ('88961', (0.5, ('Missile to the Moon (1958)', 'Sci-Fi'))),
 ('109355', (0.5, ('13 Fighting Men (1960)', 'Western'))),
 ('117172', (0.5, ('BFFs (2014)', 'Comedy'))),
 ('119627', (0.5, ('Exit to Hell (2013)', 'Action|Horror|Thriller'))),
 ('124271', (0.5, ('Textuality (2011)', 'Comedy|Romance'))),
 ('127327', (0.5, ('Khan Kluay (2006)', 'Adventure|Animation|Children'))),
 ('131152', (0.5, ('The Fat Spy (1966)', 'Comedy'))),
 ('133541', (0.5, ('Two Hundred Thousand Dirty (2014)', 'Comedy')))]

### Challenge:

Make appropriate changes so that only movies with average ratings higher than 3.75 and number of ratings totalling at least 1000 are collected.

In [35]:
movies_with_more_ratings = mapValuesToListRatings.filter(lambda x: len(x[1]) > 1000)
avgRatings03 = movies_with_more_ratings.mapValues(lambda V: sum(V) / float(len(V)))
avgRatings_high = avgRatings03.filter(lambda x: x[1] > 3.75)
augmentedRatings = avgRatings_high.join(movieInfo)
augmentedRatings.takeOrdered(10, key = lambda x : -x[1][0])

[('318',
  (4.43308862707951, ('"Shawshank Redemption, The (1994)"', 'Crime|Drama'))),
 ('858', (4.343623358918333, ('"Godfather, The (1972)"', 'Crime|Drama'))),
 ('50',
  (4.308635621494996,
   ('"Usual Suspects, The (1995)"', 'Crime|Mystery|Thriller'))),
 ('527', (4.2759629983252205, ("Schindler's List (1993)", 'Drama|War'))),
 ('1221',
  (4.263098411962443, ('"Godfather: Part II, The (1974)"', 'Crime|Drama'))),
 ('2019',
  (4.256196804539346,
   ('Seven Samurai (Shichinin no samurai) (1954)', 'Action|Adventure|Drama'))),
 ('94466', (4.2426322163907955, ('Black Mirror (2011)', 'Drama|Sci-Fi'))),
 ('904', (4.238590226483393, ('Rear Window (1954)', 'Mystery|Thriller'))),
 ('1193',
  (4.232701671272536, ("One Flew Over the Cuckoo's Nest (1975)", 'Drama'))),
 ('2959',
  (4.229890025011034, ('Fight Club (1999)', 'Action|Crime|Drama|Thriller')))]

### 4.2 Find genres which have the highest average ratings over the years

- Identify the genres associated with a movie and its rating
- Each movie can have multiple genres. How to flip the Key/Value pair?

In [36]:
movieRatings.take(5)

[('122', 2.0), ('172', 1.0), ('1221', 5.0), ('1441', 4.0), ('1609', 3.0)]

In [37]:
movieInfo.take(5)

[('1', ('Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy')),
 ('2', ('Jumanji (1995)', 'Adventure|Children|Fantasy')),
 ('3', ('Grumpier Old Men (1995)', 'Comedy|Romance')),
 ('4', ('Waiting to Exhale (1995)', 'Comedy|Drama|Romance')),
 ('5', ('Father of the Bride Part II (1995)', 'Comedy'))]

In [38]:
augmentedInfo = movieRatings.join(movieInfo)

In [39]:
augmentedInfo.take(5)

[('1357', (3.0, ('Shine (1996)', 'Drama|Romance'))),
 ('1357', (5.0, ('Shine (1996)', 'Drama|Romance'))),
 ('1357', (5.0, ('Shine (1996)', 'Drama|Romance'))),
 ('1357', (3.5, ('Shine (1996)', 'Drama|Romance'))),
 ('1357', (3.0, ('Shine (1996)', 'Drama|Romance')))]

In [40]:
def extractGenreRating (t):
    final_tuples = []
    genreList = t[1][1][1].split("|")
    for genre in genreList:
        final_tuples.append((genre,t[1][0]))
    return final_tuples

print(extractGenreRating((u'1', (3.0, (u'Toy Story (1995)', u'Adventure|Animation|Children|Comedy|Fantasy')))))

[('Adventure', 3.0), ('Animation', 3.0), ('Children', 3.0), ('Comedy', 3.0), ('Fantasy', 3.0)]


In [41]:
genreRatings = augmentedInfo.flatMap(extractGenreRating)

In [42]:
countsByKey = genreRatings.countByKey()

countsByKey

defaultdict(int,
            {'Comedy': 8929200,
             'Romance': 4577905,
             'Drama': 10794442,
             'Crime': 4066762,
             'Mystery': 1925156,
             'Thriller': 6562693,
             'Action': 7080918,
             'Adventure': 5548343,
             'Sci-Fi': 4023632,
             'War': 1280526,
             'Children': 2077277,
             'Animation': 1497462,
             'Fantasy': 2661780,
             'IMAX': 820882,
             'Horror': 1799094,
             'Musical': 1017965,
             'Documentary': 300962,
             'Western': 495361,
             'Film-Noir': 249449,
             '(no genres listed)': 5889})

In [43]:
genreRatings.take(5)

[('Drama', 3.0),
 ('Romance', 3.0),
 ('Drama', 5.0),
 ('Romance', 5.0),
 ('Drama', 5.0)]

In [44]:
sc.stop()

In [45]:
!bash stop_spark_cluster.sh

Stopping Spark cluster:
node0355.palmetto.clemson.edu: stopping org.apache.spark.deploy.worker.Worker
node0317.palmetto.clemson.edu: stopping org.apache.spark.deploy.worker.Worker
node0442.palmetto.clemson.edu: stopping org.apache.spark.deploy.worker.Worker
stopping org.apache.spark.deploy.master.Master
