# <center> Introduction to Spark In-memory Computing via Python PySpark </center>

In [None]:
!bash launch_spark_cluster.sh

In [None]:
import sys
import os
import pyspark

env_spark_home=os.path.join(os.environ['HOME'],"software","spark-2.4.5-bin-hadoop2.7")
env_spark_conf_dir=os.path.join(env_spark_home,"conf")
env_pyspark_python=os.path.join("/software","anaconda3","5.1.0","bin","python")

os.environ['SPARK_HOME'] = env_spark_home
os.environ['SPARK_CONF_DIR'] = env_spark_conf_dir
os.environ['PYSPARK_PYTHON'] = env_pyspark_python

fp = open(os.path.join(env_spark_conf_dir,"master"))
node_list = fp.readlines()

import pyspark
conf = pyspark.SparkConf()
conf.setMaster("spark://" + node_list[0].strip() + ":7077")
conf.setAppName('big-data-workshop')
conf.set("spark.driver.memory","5g")
conf.set("spark.executor.instances", "3")
conf.set("spark.executor.memory","13g")
conf.set("spark.executor.cores","8")

sc = pyspark.SparkContext(conf=conf)

print(sc)

### Movie Ratings

An independent movie company is looking to invest in a new movie project. With limited finances, the company wants to 
analyze the reactions of audiences, particularly toward various movie genres, in order to identify a 
movie project to focus on which will help the business earn more profit. The company relies on data collected from a publicly available recommendation service by [MovieLens](http://dl.acm.org/citation.cfm?id=2827872). This [dataset](http://files.grouplens.org/datasets/movielens/ml-10m-README.html) contains **24,404,096** ratings and **668,953** tags applied across **40,110** movies. This data was created by **247,753** users between January 09, 1995 and January 29, 2016. This dataset was generated on October 17, 2016. 

From this dataset, several analyses are possible, include the following:
1.   Find movies which have the highest average ratings over the years and identify the corresponding genre.
2.   Find genres which have the highest average ratings over the years.
3.   Find users who rate movies most frequently in order to contact them for an in-depth marketing analysis.

These types of analyses, which are somewhat ambiguous, demand the ability to quickly process large amounts of data in 
a relatively short amount of time for justifying business decisions. In these situations, the size of the data typically makes analysis done on a single machine impossible and analysis done using a remote storage system impractical. For the remainder of the lessons, we will learn how HDFS provides the basis to store a massive amount of data and to enable the programming approach to analyze this data.

In [None]:
!ls /zfs/citi/movielens

In [None]:
!cat /zfs/citi/movielens/README.txt

In [None]:
!cat /zfs/citi/movielens/links.csv \
    2>/dev/null | head -n 5

In [None]:
!cat /zfs/citi/movielens/movies.csv \
    2>/dev/null | head -n 5

In [None]:
!cat /zfs/citi/movielens/ratings.csv \
    2>/dev/null | head -n 5

In [None]:
!cat /zfs/citi/movielens/tags.csv \
    2>/dev/null | head -n 5

In [None]:
ratings = sc.textFile("/zfs/citi/movielens/ratings.csv")

In [None]:
ratings.cache()

In [None]:
%%time
ratings.count()

In [None]:
%%time
ratings.count()

In [None]:
%%time
ratings.count()

### 4.1 Find movies which have the highest average ratings over the years and identify the corresponding genre.

- Find the average ratings of all movies over the years
- Identify the corresponding genres for each movie

In [None]:
ratings.take(5)

In [None]:
ratingHeader = ratings.first() #extract header
print(ratingHeader)

In [None]:
ratingsOnly = ratings.filter(lambda x: x != ratingHeader)

In [None]:
ratingsOnly.take(5)

In [None]:
movieRatings = ratingsOnly.map(lambda line: (line.split(",")[1], float(line.split(",")[2])))

In [None]:
movieRatings.take(5)

**Possible approaches in aggregating data:** 
- groupByKey and mapValues
- reduceByKey and countByKey

**groupByKey and mapValues**

In [None]:
groupByKeyRatings = movieRatings.groupByKey()

groupByKeyRatings.take(5)

In [None]:
mapValuesToListRatings = groupByKeyRatings.mapValues(list)
mapValuesToListRatings.take(5)

In [None]:
avgRatings01 = mapValuesToListRatings.mapValues(lambda V: sum(V) / float(len(V)))

avgRatings01.take(5)

Is this correct?

In [None]:
test = [2.0, 4.0, 3.0]
sum(test) / len(test)

**reduceByKey and countByKey**

In [None]:
countsByKey = movieRatings.countByKey()

countsByKey

In [None]:
def sumValues(x,y):
    return (x + y)

sumRatings = movieRatings.reduceByKey(sumValues)

sumRatings.take(5)

In [None]:
import operator

sumRatings = movieRatings.reduceByKey(operator.add)
sumRatings.take(5)

In [None]:
avgRatings02 = sumRatings.map(lambda x: (x[0], x[1] / countsByKey.get(x[0])))

avgRatings02.take(5)

How do we augment movie ratings data with title information?

In [None]:
movies = sc.textFile("/zfs/citi/movielens/movies.csv")

In [None]:
movieHeader = movies.first() #extract header
print(movieHeader)

In [None]:
movies = movies.filter(lambda x: x != movieHeader)

movies.take(5)

**NOTE:** This is not a good way to handle CSV parsing since some strings may contain commas. Instead Consider using a CSV library supported by Spark. We will show this in the next notebook in the series. The following will work for now but is difficult to understand.

In [None]:
movieInfo = movies.map(lambda line: (line.split(",")[0], ((line.rsplit(",",1)[0]).split(",",1)[1], line.rsplit(",",1)[1])))

movieInfo.take(5)

In [None]:
augmentedRatings = avgRatings01.join(movieInfo)

augmentedRatings.take(5)

*Movie with highest average rating:*

In [None]:
augmentedRatings.takeOrdered(10, key = lambda x : -x[1][0])

*Movie with lowest average rating:*

In [None]:
augmentedRatings.takeOrdered(10, key = lambda x : x[1][0])

### Challenge:

Make appropriate changes so that only movies with average ratings higher than 3.75 and number of ratings totalling at least 1000 are collected.

### 4.2 Find genres which have the highest average ratings over the years

- Identify the genres associated with a movie and its rating
- Each movie can have multiple genres. How to flip the Key/Value pair?

In [None]:
movieRatings.take(5)

In [None]:
movieInfo.take(5)

In [None]:
augmentedInfo = movieRatings.join(movieInfo)

In [None]:
augmentedInfo.take(5)

In [None]:
def extractGenreRating (t):
    final_tuples = []
    genreList = t[1][1][1].split("|")
    for genre in genreList:
        final_tuples.append((genre,t[1][0]))
    return final_tuples

print(extractGenreRating((u'1', (3.0, (u'Toy Story (1995)', u'Adventure|Animation|Children|Comedy|Fantasy')))))

In [None]:
genreRatings = augmentedInfo.flatMap(extractGenreRating)

In [None]:
countsByKey = genreRatings.countByKey()

countsByKey

In [None]:
genreRatings.take(5)

In [None]:
sc.stop()

In [None]:
!bash stop_spark_cluster.sh