# <p style="text-align: center;">MIS 285N: Big Data and Distributed Programming</p>
# <p style="text-align: center;">Project - 1 : Apache Spark</p>
## <p style="text-align: center;">Instructor: Dr. Ramesh Yerraballi</p>
## <p style="text-align: center;">Due: Tuesday, September 14th submitted via Canvas by 11:59 pm</p>

Your work should be written in a **Jupyter notebook**.   

Also, please make sure your code runs in your notebook before submitting.

**Note:**

This project is based on Map-Reduce Framework. In these you will get to work with Spark and will get to know how 
does spark work, what functionalities does spark provide, what does map-reduce framework do and why is it useful. 

In this project you will be implementing a basic song recommender system. You will be given a dataset where there are multiple csv files. These csv files have data corresponding to song play count and song information.

The data you would be using will be provided in a zip file along with this notebook. The __msd.zip__ archive contains:
1. **'kaggle_visible_evaluation_triplets.txt'**. We will be using the visible part of the testing data to understand the working on Apache Spark.  The user's listening history is provided as: (user, song, play count).  
2. In **'kaggle_songs.txt'** file, each song is marked using an index for easier representation of songs.  
3. And **'kaggle_users.txt'** file is the canonical list of user identifiers.
4. Take **'MSDChallengeGettingstarted.pdf'** as your reference.



### **What to turn in?**  

A zip folder which will have:
1. Jupyter Notebook
2. A brief report in PDF format on what features you used for recommendation. And a brief explanation of flow of your code. For example,  what RDD does what or, why it was created.
3. datasets folder with the csv files you are using in your notebook.
4. Notebook should use relative path to the csv files in datasets folder.
5. Name of the zip folder - `<your_name>_<your_partner_name>.zip`

This project consists of 4 questions:  

1. Create an RDD with _kaggle_visible_evaluation_triplets.txt_ and replace the song name with the song index from _kaggle_songs.txt_. Identify the number of songs that do not have any rating. 
2. Generate song ratings based on the song play count as a normalized score between 0 and 1. 
3. Identify the popular song based on this rating and recommend songs to user, given user id based on the algorithm used in Movie recommender system from class. 
4. Using Cosine similarity function, identify pair-wise similarity between each pair of users and generate the top 5 most similar users without an overlap in users. 

The above list is the higer level idea about the questions. 

In [1]:
### Starter code ####
import findspark
findspark.init('/opt/homebrew/Cellar/apache-spark/3.1.2/libexec')
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("Songs")
sc = SparkContext(conf = conf)
#### These lines are to tell jupyter where to find Apache Spark ####

21/11/27 14:05:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
#sc.stop()

In [3]:
## Read triplet file into RDD
triplet_rdd = sc.textFile(r"msd/kaggle_visible_evaluation_triplets.txt") \
    .map(lambda line: line.split("\t")) 
triplet_rdd.take(5)

## Step 1: 
Replace song name with song index and identify the number of songs without user history

In [6]:
songs_index_rdd = sc.textFile(r"msd/kaggle_songs.txt").map(lambda line: line.split(" "))
songs_index_dict = songs_index_rdd.collectAsMap()

# Replace song name with song index
triplet_rdd_index = triplet_rdd.map(lambda x: [x[0],songs_index_dict[x[1]], x[2]])

In [4]:
# identify the number of songs without user history
# compare all the songs from kaggle_songs.txt with distinct songs fromtriplets file and 
# get number of songs with no play count data

# check if any user history contains empty or null values
count_invalid_user_history = triplet_rdd_index.filter(lambda x: x[2] == ' ' or x[2] == 0 or not x[2]).count()

distinct_songs_with_history = triplet_rdd_index.map(lambda x: x[1]).distinct()
print("Number of songs without user history are: " + str(count_invalid_user_history +
                                                         songs_index_rdd.count() - 
                                                         distinct_songs_with_history.count()))

Number of songs without user history are: 223007


## Step 2:
Generate song ratings based on the play_count. For example, if (song_1, 5; song_2, 10; song_3, 5) i.e., song_1 is played 5 times, song_2 is played 10 times and song_3 is played 5 times, the normalized rating score should be 0.25, 0.5 and 0.25 respectively. 
Similarly, generate the rating for all the songs. You may notice that based on all songs, the rating is almost always very low. So, think of the best way to convert song count to ratings. (Hint: Try generating ratings based on each user's song play history)

In [5]:
song_ratings_rdd = triplet_rdd_index.map(lambda x: (x[1], x[2]))
song_ratings_rdd = song_ratings_rdd.reduceByKey(lambda x,y: int(x)+int(y))
total_frequency_of_songs = song_ratings_rdd.map(lambda x: x[1]).reduce(lambda x,y: int(x)+int(y))
song_ratings_rdd_q2 = song_ratings_rdd.map(lambda x: (x[0], (int(x[1])/int(total_frequency_of_songs))))
song_ratings_rdd_q2.take(5)

[('185057', 2.097596630005579e-05),
 ('272059', 2.1624707525830714e-07),
 ('219307', 1.7299766020664571e-06),
 ('275648', 1.9894730923764256e-05),
 ('231909', 3.4599532041329143e-06)]

In [None]:
# OBSERVATION
# Here, above we can see that the ratings are very small. 
# Instead of dividing by total number of song plays for any user and any song,
# we will divide by the total number of song plays of users that have played that song atleast once

In [6]:
# Create new RDD with user id as key and song index and history as values
user_songs_history_rdd = triplet_rdd_index.map(lambda x : (x[0], (x[1], (int(x[2])))))

# Create rdd by summing the user history for every user
user_history_rdd = triplet_rdd.map(lambda x: (x[0],int(x[2]))).reduceByKey(lambda x, y: x+y)

# Combine the above two rdds
user_songs_with_all_history = user_history_rdd.join(user_songs_history_rdd)

# For every user, create a new rating for every song
user_ratings_rdd = user_songs_with_all_history.map(lambda x: (x[0], (x[1][1][0], (x[1][1][1] / x[1][0]))))

# Generate rating for every song by this expression-  (sum of ratings by a user for a song/total number of users that have history for that song) 
song_ratings_rdd = user_ratings_rdd.map(lambda x : (x[1][0], x[1][1])).groupByKey().map(lambda x : (x[0], sum(list(x[1])) / len(list(x[1]))))
song_ratings_rdd.take(5)

[('315822', 0.1018128879046919),
 ('243463', 0.07172266994229386),
 ('218288', 0.052246960074540875),
 ('283892', 0.05617080276802812),
 ('157983', 0.053211254755933766)]

## Step 3: 
For a given user_id (choose one by yourselves), rating, recommend 5 other songs from the list. One way to do this is based on another user who liked the same song liked by this user with rating more than the given rating and recommend the 5 songs based on the matched user's rating. 

In [7]:
# Given user id, given rating is defined below
given_userid = 'd7083f5e1d50c264277d624340edaaf3dc16095b'

#Filter records  for given user
user_filtered_rdd = user_ratings_rdd.filter(lambda x : x[0] == given_userid)

# Take any song for this user
song_1 = user_filtered_rdd.collect()[0][1][0]

# Given rating which will be used
given_rating = user_filtered_rdd.collect()[0][1][1]

#get users who have rated/listened to the selected song
songs_users_mapping_list = user_ratings_rdd.filter(lambda x : x[1][0] == song_1).map(lambda x: x[0]).collect()

filtered_similar_user_play_songs_rating = user_ratings_rdd.filter(lambda x : x[0] in songs_users_mapping_list and x[1][0] != song_1 and x[0] != given_userid)
# # song_list_rating.collect()

song_list_rating_suggestion = filtered_similar_user_play_songs_rating.filter(lambda x : x[1][1] > given_rating).takeOrdered(5,key = lambda x : - x[1][1])
song_list_rating_suggestion = [x[1] for x in song_list_rating_suggestion]
print("The 5 recommended songs and their ratings for given user id: " + str(given_userid) + " with rating more than: " + str(given_rating) + " are:\n")
song_list_rating_suggestion

The 5 recommended songs and their ratings for given user id: d7083f5e1d50c264277d624340edaaf3dc16095b with rating more than: 0.058823529411764705 are:



[('341964', 0.6896551724137931),
 ('212139', 0.65625),
 ('177574', 0.6326530612244898),
 ('172688', 0.6305882352941177),
 ('343823', 0.5454545454545454)]

## Step 4: 
1. Compute cosine similarity between all pairs of users. 
2. Sort the similarity score and print the top-5 similar users. 
3. If the top-5 user set has an user appearing more than once, ignore that pair and take the next best pair from the sorted list. 
4. For a given user_id, identify the top-5 similar users and hence song recommendations from other user's list. 

In [8]:
import math
def dot(v,w):
    """ v_1*w_1 + ... v_n*w_n"""
    return sum(v_i *w_i
              for v_i,w_i in zip(v,w))
def cosine_similarity(v, w):
    return dot(v, w) / math.sqrt(dot(v, v) * dot(w, w))

In [9]:
user_songs_dict = triplet_rdd_index.map(lambda x:(x[0], x[1])).groupByKey().mapValues(list).collectAsMap()
unique_users_list = []
unique_songs_list = []
for key in dict(list(user_songs_dict.items())[0: 500]):
    unique_users_list.append(key)
    for song in user_songs_dict[key]:
        if song not in unique_songs_list:
            unique_songs_list.append(song)

# Generate the user matrix            
user_interest_matrix = [[1 if song in user_songs_dict[user] else 0 for song in unique_songs_list] for user in unique_users_list]

In [10]:
#4.1 Compute cosine similarity between all pairs of users.
user_similarities = [[cosine_similarity(interest_vector_i, interest_vector_j)
                      for interest_vector_j in user_interest_matrix]
                     for interest_vector_i in user_interest_matrix]
user_similarities

[[1.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.07669649888473704,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.1690308509457033,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0

In [11]:
# 4.2 Sort the similarity score and print the top-5 similar users.
print(sorted(user_similarities, reverse=True))

[[1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.07669649888473704, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.1690308509457033, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.10259783520851541, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0

In [4]:
# 4.2 print the top-5 similar users.
# Calculate the index of top 5 Cij values where i and j are different
max_value = 0
max_pair = second_max_pair = third_max_pair = fourth_max_pair = fifth_max_pair = ""
second_max_value = 0
third_max_value = 0
fourth_max_value = 0
fifth_max_value = 0
max_5_index_pairs = []
for i in range(len(user_similarities)):
    for j in range(len(user_similarities[0])):
        if user_similarities[i][j] > max_value and i!=j:
            max_value = user_similarities[i][j]
            max_pair = str(i) + "," + str(j)
        elif user_similarities[i][j] < max_value and user_similarities[i][j] > second_max_value and i!=j:
            second_max_value = user_similarities[i][j]
            second_max_pair = str(i) + "," + str(j)
        elif user_similarities[i][j] < second_max_value and user_similarities[i][j] > third_max_value and i!=j:
            third_max_value = user_similarities[i][j]
            third_max_pair = str(i) + "," + str(j)
        elif user_similarities[i][j] < third_max_value and user_similarities[i][j] > fourth_max_value and i!=j:
            fourth_max_value = user_similarities[i][j]
            fourth_max_pair = str(i) + "," + str(j)
        elif user_similarities[i][j] < fourth_max_value and user_similarities[i][j] > fifth_max_value and i!=j:
            fifth_max_value = user_similarities[i][j]
            fifth_max_pair = str(i) + "," + str(j)

print("Top 5 similar users are:")
print(max_pair)
print(second_max_pair)
print(third_max_pair)
print(fourth_max_pair)
print(fifth_max_pair)

In [17]:
# 4.4 For a given user_id, identify the top-5 similar users and hence song recommendations from other user's list.

def most_similar_users_to(user_id):
    user_id = unique_users_list.index(user_id)
    pairs = [(other_user_id, similarity)              # find other
             for other_user_id, similarity in         # users with
                enumerate(user_similarities[user_id]) # nonzero
             if user_id != other_user_id and similarity > 0]  # similarity

    return sorted(pairs,                              # sort them
                  key=lambda pair: pair[1],           # most similar
                  reverse=True)                       # first

# Let's find top-5 similar users to the following user id
most_similar_users_to("296a727e7328216a7493537a960beb6f73f828f5")[:5]

[(76, 0.08304547985373997),
 (204, 0.08304547985373997),
 (423, 0.08304547985373997),
 (195, 0.07580980435789034),
 (4, 0.07018624063435965)]

In [20]:
# To make sense of the above data print user id's
most_similar_users = most_similar_users_to("296a727e7328216a7493537a960beb6f73f828f5")[:5]
print("The top 5 similar users to " + unique_users_list[2] + " are:\n")
for user in most_similar_users:
    print(unique_users_list[list(user)[0]])

The most similar users to f37853d3600c715e6003edbf35ea3dd31a17ed2a are:

10d845c4220b20dc49f2e24d807c2f0527b2c9b0
eb32c9ab369b1dfbe1646d18d4a29e78bbebca0d
afa84c08f45d515de561dc607f796fca9cce5f31
e699bb14ad602c74adf56300c899f03adb3c479d
3d6a63dd3ef66b390906ff345779d2a6e1e5cc7c


In [21]:
# Song Recommendations from other user's list
songs_listened_by_other_similar_users = []
songs_listened_by_given_user_id = []
songs_listened_by_given_user_id = user_songs_dict[unique_users_list[2]]
for user in most_similar_users:
    for song in user_songs_dict[unique_users_list[list(user)[0]]]:
        # Remove duplicates and make sure to not suggest song already liked by user
        if song not in songs_listened_by_other_similar_users and song not in songs_listened_by_given_user_id:
            songs_listened_by_other_similar_users.append(song)

print("Following are the song recommendations for user 296a727e7328216a7493537a960beb6f73f828f5 sorted by rating:\n")
song_and_rating_dict = {}
song_ratings_rdd_dict = song_ratings_rdd.collectAsMap()
for song in songs_listened_by_other_similar_users:
    song_and_rating_dict[song] = song_ratings_rdd_dict[song]
sorted_dict = dict(sorted(song_and_rating_dict.items(),
                           key=lambda item: item[1],
                           reverse=True))
print(list(sorted_dict.keys()))


Following are the song recommendations for user 296a727e7328216a7493537a960beb6f73f828f5 sorted by rating:

['354895', '221459', '305991', '246782', '237399', '307202', '319911', '177486', '266750', '205821', '49781', '89137', '123457', '311262', '347029', '48851', '289658', '343475', '256559', '23375', '119674', '294389', '361476', '65618', '222655', '86545', '325817']
