# Movie Rating Analytics:

In this notebook, I will perform data analysis on movie data. We use spark on python to develop this application. Instruction [here](https://www.udemy.com/taming-big-data-with-apache-spark-hands-on/learn/v4/t/lecture/3700612?start=0) was used to setup pyspark and perform analysis.

Data is downloaded from [Grouplens](https://grouplens.org/).

#### Problem 1:
Develope a histogram of movie ratings. What is the frequenct of each rating?

``I am saving codes into a new file and run it through jupyter notebook inline command line. This lets us not define sparc context for every tas``.
Each line of the data set has the following columns:

- ``User_id``
- ``Movie_id``
- ``Rating``
- ``Timestamp``

In [1]:
%%file codes/rating_hist.py

import findspark
findspark.init()
from pyspark import SparkConf, SparkContext  # sparkcontext is the entry points to RDD
# spark conf allows congifuring spark context
from collections import OrderedDict # to save the order that entries are added
import matplotlib.pyplot as plt
conf = SparkConf().setMaster('local').setAppName('Histogram') # configuring on one single core on machine
sc = SparkContext(conf = conf)  # Main entry point for Spark

# importing the data into RDD.
# no need to add C:/
lines = sc.textFile('file:///Users/Amin/Dropbox/Career Deveoment/Data Science/PySpark/Movierating/data/raw/ml-100k/u.data')
# mapping line into rating values=
# map performs one to one between input and output
ratings = lines.map(lambda x: x.strip().split('\t')[2])
# reducing based on the provided value (no key is available)
histogram = ratings.countByValue()
ordered_hist = OrderedDict(sorted(histogram.items(), key = lambda t:t[1], reverse=True)) # sort them by largest values

ratings = []
frequencies = []
for rating, frequency in ordered_hist.items():
    print('Rating *{}* has {} entry.'.format(rating, frequency))
    ratings.append(str(rating))
    frequencies.append(frequency)
    
    
plt.bar(ratings, frequencies)
plt.xlabel('Ratings')
plt.ylabel('Frequencies')
plt.show()

Overwriting codes/rating_hist.py


In [4]:
! python codes/rating_hist.py

Rating *4* has 34174 entry.
Rating *3* has 27144 entry.
Rating *5* has 21201 entry.
Rating *2* has 11370 entry.
Rating *1* has 6110 entry.
Rating *23* has 1 entry.
Figure(640x480)
SUCCESS: The process with PID 19532 (child process of PID 988) has been terminated.
SUCCESS: The process with PID 988 (child process of PID 16332) has been terminated.
SUCCESS: The process with PID 16332 (child process of PID 8664) has been terminated.


19/03/10 11:02:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

[Stage 0:>                                                          (0 + 1) / 1]
                                                                                


#### Problem 2: 
Find the movie that has the highest average and is watched more than 100 times.
We are also going to use the concept of broadcasting to pring out movie name instead of movie id.

In [15]:
%%file codes/top_movies.py

# we need to perform table join between movie names and movie ids
# developing a look up table for movie names and movie ids 
import os
def movie_lut(): # movie look-up-table
    file = os.path.join(os.getcwd(), r'..', r'data\raw\ml-100k\u.item')
    with open(file) as items:
        movie_dict = dict()  # initializing the look up table
        for line in items:
            data = line.strip().split('|')  # the file is pipe delimiated
            movie_id = data[0]
            movie_name = data[1]
            movie_dict[movie_id] = movie_name
    return(movie_dict)

look_up_table = movie_lut()

# accessing pyspark module
import findspark
findspark.init()

# configure and connect to spark context
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster('local[4]').setAppName('TopMovies')
sc = SparkContext(conf = conf)
# broadcast the look up table to all the nodes
boradcasted_lut = sc.broadcast(look_up_table)
# improt the data into RDD
movies = sc.textFile('file:///Users/Amin/Dropbox/Career Deveoment/Data Science/PySpark/Movierating/data/raw/ml-100k/u.data')
# parse the lines
movie_rating = movies.map(lambda x: x.strip().split('\t')[1:3]).map(lambda x: (x[0], (float(x[1]),1))) # appended 1 to simplify averaging
# perform required mapping and reducing
movie_by_rating = movie_rating.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])).filter(lambda x: x[1][1] > 100).mapValues(lambda x: round(x[0] / x[1],3)) # average
sorted_movies = movie_by_rating.sortBy(lambda x: x[1], ascending = False).map(lambda x: (boradcasted_lut.value[x[0]], x[1])) # replacing movie id by movie name
# collect the results
#top_movie = sorted_movies.first() # first sorted element
# output the results
results = sorted_movies.take(10)
print("Movie:" + "\t" + 'Avg Rating')
print('-'*30)
for movie, avgrat in results:
    print(movie, ":\t", avgrat)
    
# closet the context 
sc.stop()

Overwriting codes/top_movies.py


In [16]:
!python codes/top_movies.py

Movie:	Avg Rating
------------------------------
Close Shave, A (1995) :	 4.491
Schindler's List (1993) :	 4.466
Wrong Trousers, The (1993) :	 4.466
Casablanca (1942) :	 4.457
Shawshank Redemption, The (1994) :	 4.445
Rear Window (1954) :	 4.388
Usual Suspects, The (1995) :	 4.386
Star Wars (1977) :	 4.358
12 Angry Men (1957) :	 4.344
Citizen Kane (1941) :	 4.293
SUCCESS: The process with PID 9656 (child process of PID 4232) has been terminated.
SUCCESS: The process with PID 4232 (child process of PID 6404) has been terminated.
SUCCESS: The process with PID 6404 (child process of PID 8016) has been terminated.


19/03/16 13:43:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

[Stage 0:>                                                          (0 + 2) / 2]
[Stage 1:>                                                          (0 + 2) / 2]
                                                                                

[Stage 3:>                                                          (0 + 2) / 2]
                                                                                

[Stage 5:>                                                          (0 + 2) / 2]
[Stage 6:>                                                          (0 + 1) / 1]
                                                                                


#### Probelm 3
We are doing the same logic, but on AWS EMR.

In [3]:
%%file codes/top_movies_1m.py

# to run on EMR successfuly first copy the files to the current working directoy (master node)
# aws s3 cp s3://pyspark-0000/top_movies_1m.py ./
# aws s3 cp s3://pyspark-0000/movies.dat ./
#spark-submit --executor-memory 1g top_movies_1m.py
# we need to perform table join between movie names and movie ids
# developing a look up table for movie names and movie ids 
def movie_lut(): # movie look-up-table
    file = 'movies.dat'
    with open(file) as items:
        movie_dict = dict()  # initializing the look up table
        for line in items:
            data = line.strip().split('::')  # the file is pipe delimiated
            movie_id = data[0]
            movie_name = data[1]
            movie_dict[movie_id] = movie_name
    return(movie_dict)

look_up_table = movie_lut()

# accessing pyspark module
#import findspark
#findspark.init()

# configure and connect to spark context
from pyspark import SparkConf, SparkContext
conf = SparkConf()# not configuring sparcontext automatically tell spark to go on pyspark yarn
sc = SparkContext(conf = conf)
# broadcast the look up table to all the nodes
boradcasted_lut = sc.broadcast(look_up_table)
# improt the data into RDD
movies = sc.textFile('s3://pyspark-0000/ratings.dat')
# parse the lines
movie_rating = movies.map(lambda x: x.strip().split('::')[1:3]).map(lambda x: (x[0], (float(x[1]),1))) # appended 1 to simplify averaging
# perform required mapping and reducing
movie_by_rating = movie_rating.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])).filter(lambda x: x[1][1] > 100).mapValues(lambda x: round(x[0] / x[1],3)) # average
sorted_movies = movie_by_rating.sortBy(lambda x: x[1], ascending = False).map(lambda x: (boradcasted_lut.value[x[0]], x[1])) # replacing movie id by movie name
# collect the results
#top_movie = sorted_movies.first() # first sorted element
# output the results
results = sorted_movies.take(10)
print("Movie:" + "\t" + 'Avg Rating')
print('-'*30)
for movie, avgrat in results:
    print(movie, ":\t", avgrat)
    
# closet the context 
sc.stop()

Overwriting codes/top_movies_1m.py
