# Most popular rating - RDD only

This notebook finds the highest rated movies having at least 500 ratings.

First, it's done using lambdas only, then I do the same thing again creating full functions.

In [1]:
import findspark
findspark.init()

import pyspark
from pyspark import sql

import pandas as pd

In [2]:
conf = SparkConf().setAppName("MostPopularMovieWithRatingAbove4RDDOnly")
# sc = SparkContext(conf=conf)

## Using lambdas only

### Load Data

In [3]:
path_ratings = r"/home/blake/PycharmProjects/BigDataMovies/the-movies-dataset/ratings.csv"
path_movies_metadata = r"/home/blake/PycharmProjects/BigDataMovies/the-movies-dataset/movies_metadata.csv"

ratings = sc.textFile(path_ratings)
movies_metadata = sc.textFile(path_movies_metadata)

### Define useful functions

In [4]:
def header_dropper(rdd: pyspark.rdd.RDD):
    header = rdd.first()
    print(header)
    no_header_rdd = rdd.filter(lambda row: row != header)
    return no_header_rdd

In [5]:
def get_id_and_title(line):
    movie_id = line['id']
    try:
        movie_id = int(movie_id)
    except (TypeError, ValueError):
        pass
    movie_titile = line['title']
    return movie_id, movie_titile

### Drop Headers

In [6]:
ratings = header_dropper(ratings)

userId,movieId,rating,timestamp


In [7]:
movies_metadata = header_dropper(movies_metadata)

adult,belongs_to_collection,budget,genres,homepage,id,imdb_id,original_language,original_title,overview,popularity,poster_path,production_companies,production_countries,release_date,revenue,runtime,spoken_languages,status,tagline,title,video,vote_average,vote_count


### Analysis

#### Create an RDD with highest ratings with count of ratings above 500

In [8]:
ratings.take(5)

['1,110,1.0,1425941529',
 '1,147,4.5,1425942435',
 '1,858,5.0,1425941523',
 '1,1221,5.0,1425941546',
 '1,1246,5.0,1425941556']

In [9]:
movie_id_and_rating = ratings.map(lambda line: line.split(',')[1:3])
movie_id_and_rating.take(5)

[['110', '1.0'],
 ['147', '4.5'],
 ['858', '5.0'],
 ['1221', '5.0'],
 ['1246', '5.0']]

In [10]:
# Result: movie_id: int, (rating: float, count: int)
movie_id_and_rating_right_formats = movie_id_and_rating.map(
    lambda line: (int(line[0]), (float(line[1]), 1)))
movie_id_and_rating_right_formats.take(6)

[(110, (1.0, 1)),
 (147, (4.5, 1)),
 (858, (5.0, 1)),
 (1221, (5.0, 1)),
 (1246, (5.0, 1)),
 (1968, (4.0, 1))]

In [11]:
movie_id_and_sum_rating_and_sum_count = \
    movie_id_and_rating_right_formats\
        .reduceByKey(lambda row_a, row_b: (row_a[0]+row_b[0], row_a[1]+row_b[1]))
movie_id_and_sum_rating_and_sum_count.collect();

In [12]:
movie_id_and_avg_rating_and_count = \
    movie_id_and_sum_rating_and_sum_count\
        .map(lambda row: (row[0], (row[1][0]/row[1][1], row[1][1])))
movie_id_and_avg_rating_and_count.take(5)

[(110, (4.016057252826558, 66512)),
 (858, (4.339810758717364, 57070)),
 (91542, (3.738952195664258, 7196)),
 (112552, (4.120283855706682, 8455)),
 (1210, (3.9896115699843735, 62714))]

In [13]:
movie_id_and_avg_rating_and_count_above_500 = \
    movie_id_and_avg_rating_and_count\
        .filter(lambda row: row[1][1] >= 500)
movie_id_and_avg_rating_and_count_above_500.take(5)

[(110, (4.016057252826558, 66512)),
 (858, (4.339810758717364, 57070)),
 (91542, (3.738952195664258, 7196)),
 (112552, (4.120283855706682, 8455)),
 (1210, (3.9896115699843735, 62714))]

In [14]:
movie_id_and_avg_rating_and_count_above_500_ordered_by_rating = \
    movie_id_and_avg_rating_and_count_above_500\
        .sortBy(lambda row: row[1][0], False)
movie_id_and_avg_rating_and_count_above_500_ordered_by_rating.take(10)

[(159817, (4.478779840848806, 754)),
 (318, (4.429014514393623, 91082)),
 (858, (4.339810758717364, 57070)),
 (50, (4.300188962561792, 59271)),
 (527, (4.266530696698294, 67662)),
 (1221, (4.263475012950189, 36679)),
 (2019, (4.255073602972702, 13994)),
 (904, (4.232552144363722, 21335)),
 (1203, (4.231208570075758, 16896)),
 (2959, (4.2307160469145675, 60024))]

#### Create an RDD movie_id and movie_title

In [15]:
# movie_medadata file is actually quite broken and it'd be hard 
#to read it and clean it in a form of RDD so I will use 
# DF for convenience and transform it to RDD.
movies_metadata = spark.read.csv(
    path=path_movies_metadata,
    header="true",
    inferSchema="true",
#     quote=""
)

movies_metadata = movies_metadata.rdd
type(movies_metadata)

pyspark.rdd.RDD

In [16]:
movide_id_and_title = movies_metadata.map(get_id_and_title)

#### Join the above RDDs and return the final result

In [17]:
movie_id_and_avg_rating_and_count_above_500_ordered_by_rating.first()

(159817, (4.478779840848806, 754))

In [18]:
movide_id_and_title.first()

(862, 'Toy Story')

In [19]:
results = movie_id_and_avg_rating_and_count_above_500_ordered_by_rating\
    .join(movide_id_and_title)

#### RESULTS

In [20]:
results.take(10)

[(527, ((4.266530696698294, 67662), 'Once Were Warriors')),
 (58559, ((4.182070707070707, 39600), 'Confession of a Child of the Century')),
 (899, ((4.02758537371134, 12416), 'Broken Blossoms')),
 (3007, ((3.9069042316258353, 2245), 'Woman in the Moon')),
 (1643, ((3.8609756097560974, 2870), 'Last Tango in Paris')),
 (5177, ((3.8438320209973753, 762), 'Dark Horse')),
 (5890, ((3.8422360248447207, 805), 'Azumi 2: Death or Love')),
 (2728, ((3.8367664877964343, 5777), 'Postal')),
 (26133, ((3.813053097345133, 1356), 'Tuesdays with Morrie')),
 (62, ((3.750324282255275, 23128), '2001: A Space Odyssey'))]

## More readable version using defined functions

In [21]:
path_ratings = r"/home/blake/PycharmProjects/BigDataMovies/the-movies-dataset/ratings_small.csv"
path_movies_metadata = r"/home/blake/PycharmProjects/BigDataMovies/the-movies-dataset/movies_metadata.csv"

ratings = sc.textFile(path_ratings)
movies_metadata = sc.textFile(path_movies_metadata)

### Define useful functions

In [22]:
def header_dropper(rdd: pyspark.rdd.RDD):
    header = rdd.first()
    print(header)
    no_header_rdd = rdd.filter(lambda row: row != header)
    return no_header_rdd

In [23]:
def get_id_and_title(line):
    movie_id = line['id']
    try:
        movie_id = int(movie_id)
    except (TypeError, ValueError):
        pass
    movie_titile = line['title']
    return movie_id, movie_titile

In [24]:
def get_id_and_rating_and_count(line):
    # Result: movie_id: int, (rating: float, count: int)
    movie_id, movie_rating = line.split(',')[1:3]
    movie_id = int(movie_id)
    movie_rating = float(movie_rating)
    return movie_id, (movie_rating, 1)

In [25]:
def sum_ratings_and_sum_counts(row_a, row_b):
    rating_row_a, rating_row_b = row_a[0], row_b[0]
    count_row_a, count_row_b = row_a[1], row_b[1]
    
    return rating_row_a+rating_row_b, count_row_a+count_row_b

In [26]:
def get_avg_rating_and_count(row):
    movie_id = row[0]
    ratings_total = row[1][0]
    ratings_count =  row[1][1]
    avg_rating = ratings_total/ratings_count
    return movie_id, (avg_rating, ratings_count)

In [27]:
def get_ratings_above_500(row):
    ratings_count = row[1][1]
    return ratings_count >= 500

In [28]:
def get_rating(row):
    movie_rating = row[1][0]
    return movie_rating

### Drop Headers

In [29]:
ratings = header_dropper(ratings)

userId,movieId,rating,timestamp


In [30]:
movies_metadata = header_dropper(movies_metadata)

adult,belongs_to_collection,budget,genres,homepage,id,imdb_id,original_language,original_title,overview,popularity,poster_path,production_companies,production_countries,release_date,revenue,runtime,spoken_languages,status,tagline,title,video,vote_average,vote_count


### Analysis

#### Create an RDD with highest ratings with count of ratings above 500

In [31]:
ratings.take(5)

['1,31,2.5,1260759144',
 '1,1029,3.0,1260759179',
 '1,1061,3.0,1260759182',
 '1,1129,2.0,1260759185',
 '1,1172,4.0,1260759205']

In [32]:
movie_id_and_rating = ratings.map(get_id_and_rating_and_count)
movie_id_and_rating.take(5)

[(31, (2.5, 1)),
 (1029, (3.0, 1)),
 (1061, (3.0, 1)),
 (1129, (2.0, 1)),
 (1172, (4.0, 1))]

In [33]:
movie_id_and_sum_rating_and_sum_count = \
    movie_id_and_rating_right_formats\
        .reduceByKey(sum_ratings_and_sum_counts)
movie_id_and_sum_rating_and_sum_count.take(5)

[(110, (267116.0, 66512)),
 (858, (247673.0, 57070)),
 (91542, (26905.5, 7196)),
 (112552, (34837.0, 8455)),
 (1210, (250204.5, 62714))]

In [34]:
movie_id_and_avg_rating_and_count = \
    movie_id_and_sum_rating_and_sum_count\
        .map(get_avg_rating_and_count)
movie_id_and_avg_rating_and_count.take(5)

[(110, (4.016057252826558, 66512)),
 (858, (4.339810758717364, 57070)),
 (91542, (3.738952195664258, 7196)),
 (112552, (4.120283855706682, 8455)),
 (1210, (3.9896115699843735, 62714))]

In [36]:
movie_id_and_avg_rating_and_count_above_500 = \
    movie_id_and_avg_rating_and_count\
        .filter(get_ratings_above_500)
movie_id_and_avg_rating_and_count_above_500.take(5)

[(110, (4.016057252826558, 66512)),
 (858, (4.339810758717364, 57070)),
 (91542, (3.738952195664258, 7196)),
 (112552, (4.120283855706682, 8455)),
 (1210, (3.9896115699843735, 62714))]

In [37]:
movie_id_and_avg_rating_and_count_above_500_ordered_by_rating = \
    movie_id_and_avg_rating_and_count_above_500\
        .sortBy(get_rating, False)
movie_id_and_avg_rating_and_count_above_500_ordered_by_rating.take(10)

[(159817, (4.478779840848806, 754)),
 (318, (4.429014514393623, 91082)),
 (858, (4.339810758717364, 57070)),
 (50, (4.300188962561792, 59271)),
 (527, (4.266530696698294, 67662)),
 (1221, (4.263475012950189, 36679)),
 (2019, (4.255073602972702, 13994)),
 (904, (4.232552144363722, 21335)),
 (1203, (4.231208570075758, 16896)),
 (2959, (4.2307160469145675, 60024))]

#### Create an RDD movie_id and movie_title

In [38]:
# movie_medadata file is actually quite broken and it'd be hard 
#to read it and clean it in a form of RDD so I will use 
# DF for convenience and transform it to RDD.
movies_metadata = spark.read.csv(
    path=path_movies_metadata,
    header="true",
    inferSchema="true",
)

movies_metadata = movies_metadata.rdd
type(movies_metadata)

pyspark.rdd.RDD

In [39]:
movide_id_and_title = movies_metadata.map(get_id_and_title)

#### Join the above RDDs and return the final result

In [40]:
movie_id_and_avg_rating_and_count_above_500_ordered_by_rating.first()

(159817, (4.478779840848806, 754))

In [41]:
movide_id_and_title.first()

(862, 'Toy Story')

In [42]:
results = movie_id_and_avg_rating_and_count_above_500_ordered_by_rating\
    .join(movide_id_and_title)

#### RESULTS

In [45]:
results.take(10)

[(527, ((4.266530696698294, 67662), 'Once Were Warriors')),
 (58559, ((4.182070707070707, 39600), 'Confession of a Child of the Century')),
 (899, ((4.02758537371134, 12416), 'Broken Blossoms')),
 (3007, ((3.9069042316258353, 2245), 'Woman in the Moon')),
 (1643, ((3.8609756097560974, 2870), 'Last Tango in Paris')),
 (5177, ((3.8438320209973753, 762), 'Dark Horse')),
 (5890, ((3.8422360248447207, 805), 'Azumi 2: Death or Love')),
 (2728, ((3.8367664877964343, 5777), 'Postal')),
 (26133, ((3.813053097345133, 1356), 'Tuesdays with Morrie')),
 (62, ((3.750324282255275, 23128), '2001: A Space Odyssey'))]