In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkConf,SparkContext

In [3]:
# local[*] means make use of all available cores on the system and distribute the tasks through its own cluster
# manager i.e spark cluster manager.
conf = SparkConf().setMaster("local[*]").setAppName("SimilarMaovies")
sc = SparkContext(conf=conf)

In [4]:
base_rdd = sc.textFile('file:///Users/hdagar3/Documents/Spark_Things/Spark_Course_Files/ml-100k/u.data')

In [5]:
print(base_rdd.count())

100000


In [6]:
def split_function(line):
    info = line.split()
    user_id = int(info[0])
    movie_id = int(info[1])
    
    rating = float(info[2])
    
    return (user_id,(movie_id,rating))

In [7]:
user_movie_rating_rdd = base_rdd.map(split_function)

In [8]:
print(user_movie_rating_rdd.count())

100000


In [9]:
# print(user_movie_rating_rdd.collect())

In [10]:
joined_ratings_on_same_user_rdd = user_movie_rating_rdd.join(user_movie_rating_rdd)

In [11]:
print(joined_ratings_on_same_user_rdd.count())

20200812


In [12]:
print(joined_ratings_on_same_user_rdd.top(10))

[(943, ((1330, 3.0), (1330, 3.0))), (943, ((1330, 3.0), (1228, 3.0))), (943, ((1330, 3.0), (1188, 3.0))), (943, ((1330, 3.0), (1074, 4.0))), (943, ((1330, 3.0), (1067, 2.0))), (943, ((1330, 3.0), (1047, 2.0))), (943, ((1330, 3.0), (1044, 3.0))), (943, ((1330, 3.0), (1028, 2.0))), (943, ((1330, 3.0), (1011, 2.0))), (943, ((1330, 3.0), (943, 5.0)))]


In [13]:
def filter_function(user_movies):
    movie1,rating1 = user_movies[1][0]
    movie2,rating2 = user_movies[1][1]
    return movie1 < movie2

In [14]:
unique_joined_ratings_rdd = joined_ratings_on_same_user_rdd.filter(filter_function)

In [15]:
print(unique_joined_ratings_rdd.count())

10050406


In [16]:
print(unique_joined_ratings_rdd.top(10))

[(943, ((1228, 3.0), (1330, 3.0))), (943, ((1188, 3.0), (1330, 3.0))), (943, ((1188, 3.0), (1228, 3.0))), (943, ((1074, 4.0), (1330, 3.0))), (943, ((1074, 4.0), (1228, 3.0))), (943, ((1074, 4.0), (1188, 3.0))), (943, ((1067, 2.0), (1330, 3.0))), (943, ((1067, 2.0), (1228, 3.0))), (943, ((1067, 2.0), (1188, 3.0))), (943, ((1067, 2.0), (1074, 4.0)))]


In [17]:
# constructing movie id to movie name rdd
movie_item_rdd = sc.textFile('file:///Users/hdagar3/Documents/Spark_Things/Spark_Course_Files/ml-100k/u.item')

In [18]:
def map_function(line):
    info = line.split("|")
    movie_id = int(info[0])
    movie_name = info[1].encode('ascii','ignore')
    return (movie_id,movie_name.decode())

In [19]:
movieid_name_rdd = movie_item_rdd.map(map_function)

In [20]:
print(movieid_name_rdd.count())

1682


In [21]:
print(movieid_name_rdd.top(10))
# print(movieid_name_rdd.collect())

[(1682, 'Scream of Stone (Schrei aus Stein) (1991)'), (1681, 'You So Crazy (1994)'), (1680, 'Sliding Doors (1998)'), (1679, 'B. Monkey (1998)'), (1678, "Mat' i syn (1997)"), (1677, 'Sweet Nothing (1995)'), (1676, 'War at Home, The (1996)'), (1675, 'Sunchaser, The (1996)'), (1674, 'Mamma Roma (1962)'), (1673, 'Mirage (1995)')]


In [22]:
print(movieid_name_rdd.lookup(1680))
print(movieid_name_rdd.max())
print(movieid_name_rdd.min())   

# all above are actions

['Sliding Doors (1998)']
(1682, 'Scream of Stone (Schrei aus Stein) (1991)')
(1, 'Toy Story (1995)')


In [23]:
# NOW lets continue with unique_joined_ratings_rdd

In [24]:
def shuffle_movies_and_getaway_with_user(line):
    tuple_info = line[1]
    
    movie1,rating1 = tuple_info[0]
    movie2,rating2 = tuple_info[1]
    
    return ((movie1,movie2),(rating1,rating2))

In [25]:
movies_ratings_rdd = unique_joined_ratings_rdd.map(shuffle_movies_and_getaway_with_user)

In [26]:
print(movies_ratings_rdd.count())

10050406


In [27]:
print(movies_ratings_rdd.top(5))

[((1679, 1680), (3.0, 2.0)), ((1678, 1680), (1.0, 2.0)), ((1678, 1679), (1.0, 3.0)), ((1675, 1676), (3.0, 2.0)), ((1672, 1681), (2.0, 3.0))]


In [28]:
grouped_movies_ratings_all_users_rdd = movies_ratings_rdd.groupByKey()
# whenever we apply groupByKey on rdd then as a value it gives you list of values along with a key
# so in this case value would be list of tuples with a key as unique movie pair

In [29]:
print(grouped_movies_ratings_all_users_rdd.count())

983206


In [30]:
print(grouped_movies_ratings_all_users_rdd.top(10))
# unique movie pair to list of tuples i.e rating tuples

[((1679, 1680), <pyspark.resultiterable.ResultIterable object at 0x10592d0b8>), ((1678, 1680), <pyspark.resultiterable.ResultIterable object at 0x10592d940>), ((1678, 1679), <pyspark.resultiterable.ResultIterable object at 0x10592dc50>), ((1675, 1676), <pyspark.resultiterable.ResultIterable object at 0x101d4c5f8>), ((1672, 1681), <pyspark.resultiterable.ResultIterable object at 0x1056d8390>), ((1669, 1670), <pyspark.resultiterable.ResultIterable object at 0x105918630>), ((1668, 1670), <pyspark.resultiterable.ResultIterable object at 0x10592db38>), ((1668, 1669), <pyspark.resultiterable.ResultIterable object at 0x105918e80>), ((1667, 1670), <pyspark.resultiterable.ResultIterable object at 0x10592deb8>), ((1667, 1669), <pyspark.resultiterable.ResultIterable object at 0x10592d898>)]


In [31]:
# Now it is time to compute cosine similarity
# Remember that while applying mapValues(), key would remain the same --> only change will be reflected to values 
# and keys would remain the same, so [key would map to modifiedValue]

# Very important thing is : while applying flatMapValues(), then one value associated with key (one element) would 
# be converted to many values with a same key (many elements in RDD)

In [32]:
from math import sqrt
def cosineSimilarity(ratings):
    list_of_ratings_tuple = ratings
    sum_xx = sum_yy = sum_xy = 0
    no_of_users_rated = 0
    for rating_tuple in list_of_ratings_tuple:
        sum_xx += rating_tuple[0] * rating_tuple[0]
        sum_yy += rating_tuple[1] * rating_tuple[1]
        sum_xy += rating_tuple[0] * rating_tuple[1]
        no_of_users_rated += 1
        
    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)
    
    rating = 0
    if denominator:
        rating = float(numerator)/denominator
    
    return (rating,no_of_users_rated)

In [33]:
movies_similarities_rdd = grouped_movies_ratings_all_users_rdd.mapValues(cosineSimilarity).cache()

In [34]:
print(movies_similarities_rdd.count())
# As we cached movies_similarities_rdd, so execution of movies_similarities_rdd.count() would speed up  

983206


In [35]:
print(movies_similarities_rdd.top(10))

[((1679, 1680), (1.0, 1)), ((1678, 1680), (1.0, 1)), ((1678, 1679), (1.0, 1)), ((1675, 1676), (1.0, 1)), ((1672, 1681), (1.0, 1)), ((1669, 1670), (1.0, 1)), ((1668, 1670), (1.0, 1)), ((1668, 1669), (1.0, 1)), ((1667, 1670), (1.0, 1)), ((1667, 1669), (1.0, 1))]


In [36]:
# Top 10 movies similar to the movie provided here
movie_id = 50

In [37]:
# closer the cosine similarity is to the one, more better is the situation
# cosine similarity as 1 is ideal

def filter_out_given_movies(line):
    movie1,movie2 = line[0]
    similarity = line[1][0]
    no_of_users_rated = line[1][1]
    
    if (movie1 == movie_id or movie2 == movie_id) and similarity > 0.97 and no_of_users_rated > 50:
        return True
    return False

In [38]:
filtered_movie_similarities_rdd = movies_similarities_rdd.filter(filter_out_given_movies)

In [39]:
reverse_rdd_to_sort = filtered_movie_similarities_rdd.map(lambda x: (x[1],x[0]))

In [40]:
top_ten_similar_movie_list = reverse_rdd_to_sort.sortByKey(ascending=False).take(10)
# take is an action which returns top 10 elements of rdd in a list
# So take() is very similar to the top() function

In [41]:
print(type(top_ten_similar_movie_list))
print(top_ten_similar_movie_list)

<class 'list'>
[((0.9895522078385338, 345), (50, 172)), ((0.9857230861253026, 480), (50, 181)), ((0.981760098872619, 380), (50, 174)), ((0.9789385605497993, 68), (50, 141)), ((0.9776576120448436, 109), (50, 178)), ((0.9775948291054827, 92), (50, 408)), ((0.9764692222674887, 138), (50, 498)), ((0.9751512937740359, 204), (50, 194)), ((0.9748681355460885, 103), (50, 169)), ((0.9741816128302572, 58), (50, 114))]


In [43]:
print(f'Top 10 movies which are similar to {movieid_name_rdd.lookup(movie_id)[0]} movie are :')

for item in top_ten_similar_movie_list:
    similarity = item[0][0]
    no_of_users_rated = item[0][1]
    
    movie1 = item[1][0]
    movie2 = item[1][1]
    
    other_movie_id = movie1
    if movie1 == movie_id:
        other_movie_id = movie2
    print(f'{movieid_name_rdd.lookup(other_movie_id)[0]} movie with {similarity} similarity \
          and {no_of_users_rated} no of users rated')

Top 10 movies which are similar to Star Wars (1977) movie are :
Empire Strikes Back, The (1980) movie with 0.9895522078385338 similarity           and 345 no of users rated
Return of the Jedi (1983) movie with 0.9857230861253026 similarity           and 480 no of users rated
Raiders of the Lost Ark (1981) movie with 0.981760098872619 similarity           and 380 no of users rated
20,000 Leagues Under the Sea (1954) movie with 0.9789385605497993 similarity           and 68 no of users rated
12 Angry Men (1957) movie with 0.9776576120448436 similarity           and 109 no of users rated
Close Shave, A (1995) movie with 0.9775948291054827 similarity           and 92 no of users rated
African Queen, The (1951) movie with 0.9764692222674887 similarity           and 138 no of users rated
Sting, The (1973) movie with 0.9751512937740359 similarity           and 204 no of users rated
Wrong Trousers, The (1993) movie with 0.9748681355460885 similarity           and 103 no of users rated
Wallace 

In [80]:
# you can take any rdd and then call .saveAsTextFile(<name_of_file>) on that rdd to save results as a text file

# END