# <p style="text-align: center;">MIS 285N: Big Data and Distributed Programming</p>
# <p style="text-align: center;">Project - 1 : Apache Spark</p>
## <p style="text-align: center;">Instructor: Dr. Ramesh Yerraballi</p>
## <p style="text-align: center;">Due: Tuesday, September 14th submitted via Canvas by 11:59 pm</p>

Your work should be written in a **Jupyter notebook**.   

Also, please make sure your code runs in your notebook before submitting.

**Note:**

This project is based on Map-Reduce Framework. In these you will get to work with Spark and will get to know how 
does spark work, what functionalities does spark provide, what does map-reduce framework do and why is it useful. 

In this project you will be implementing a basic song recommender system. You will be given a dataset where there are multiple csv files. These csv files have data corresponding to song play count and song information.

The data you will use is provided in a zip file along with this notebook. The __msd.zip__ archive contains:
1. **'kaggle_visible_evaluation_triplets.txt'**. We will be using the visible part of the testing data to understand the working on Apache Spark.  The user's listening history is provided as: (user, song, play count).  
2. In **'kaggle_songs.txt'** file, each song is marked using an index for easier representation of songs.  
3. And **'kaggle_users.txt'** file is the canonical list of user identifiers.
4. Take **'MSDChallengeGettingstarted.pdf'** as your reference.



### **What to turn in?**  

A zip folder which will have:
1. Jupyter Notebook
2. A brief report in PDF format on what features you used for recommendation. And a brief explanation of flow of your code. For example,  what RDD does what or, why it was created.
3. datasets folder with the csv files you are using in your notebook.
4. Notebook should use relative path to the csv files in datasets folder.
5. Name of the zip folder - `<your_name>_<your_partner_name>.zip`

This project consists of 4 questions:  

1. Create an RDD with _kaggle_visible_evaluation_triplets.txt_ and replace the song name with the song index from _kaggle_songs.txt_. Identify the number of songs that do not have any rating. 
2. Generate song ratings based on the song play count as a normalized score between 0 and 1. 
3. Identify the popular song based on this rating and recommend songs to user, given user id based on the algorithm used in Movie recommender system from class. 
4. Using Cosine similarity function, identify pair-wise similarity between each pair of users and generate the top 5 most similar users without an overlap in users. 

The above list is the high-level idea about the questions. 

In [1]:
### Starter code ####
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("Songs")
sc = SparkContext.getOrCreate(conf = conf)
#### These lines are to tell jupyter where to find Apache Spark ####

22/09/13 18:36:51 WARN Utils: Your hostname, Joels-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.147.50.168 instead (on interface en0)
22/09/13 18:36:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/13 18:36:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/09/13 18:36:53 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/09/13 18:36:53 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
22/09/13 18:36:53 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


In [2]:
#sc.stop()

In [3]:
## Read triplet file into RDD
triplet_rdd = sc.textFile(r"msd/kaggle_visible_evaluation_triplets.txt") \
    .map(lambda line: line.split("\t")) 

# triplets are in following format: [user, song, # of plays]
test = triplet_rdd.collect()

                                                                                

## Step 1: 
Replace song name with song index and identify the number of songs without user history

In [4]:
f = open('msd/kaggle_songs.txt', 'r') 

songs_rdd = sc.textFile(r"msd/kaggle_songs.txt").map(lambda line: line.split(" "))

# code grabbed from the provided MSD Challenge PDF
# creates a dictionary that maps song IDs to their corresponding index value
song_to_index = dict(map(lambda line: 
                                line.strip().split(' '),
                                f.readlines())) 
f.close()

# leaves x[0] and x[2] the same but converts song IDs into corresponding index values
name_replaced_rdd = triplet_rdd.map(lambda x: [x[0], song_to_index[x[1]], x[2]]).sortBy(lambda x: x[1])

                                                                                

In [5]:
# this RDD contains only songs that are included in the triples file i.e., songs that have user listening history
distinct_songs_rdd = name_replaced_rdd.map(lambda x: int(x[1])).distinct().sortBy(lambda x: x)
distinct_songs_rdd.take(10) # should output [5, 6, 10, 13, 16,...]

                                                                                

[5, 6, 10, 13, 16, 20, 22, 29, 30, 32]

In [6]:
# this RDD contains only song indexes i.e., no song IDs
songs_indexes_only_rdd = songs_rdd.map(lambda x: int(x[1]))
# this RDD takes the list of all songs (created above) and removes all songs with listening history (distinct_songs_rdd)
# the resulting RDD is all songs without listening history 
songs_without_listening_history_rdd = songs_indexes_only_rdd.subtract(distinct_songs_rdd).sortBy(lambda x: x)

                                                                                

In [7]:
print(songs_without_listening_history_rdd.take(10))
num_of_songs_without_listen_history = songs_without_listening_history_rdd.count()
print("Number of songs without listening history: " + format(num_of_songs_without_listen_history, ','))

                                                                                

[1, 2, 3, 4, 7, 8, 9, 11, 12, 14]
Number of songs without listening history: 223,007


## Step 2:
Generate song ratings based on the play_count. For example, if (song_1, 5; song_2, 10; song_3, 5) i.e., song_1 is played 5 times, song_2 is played 10 times and song_3 is played 5 times, the normalized rating score should be 0.25, 0.5 and 0.25 respectively. 
Similarly, generate the rating for all the songs. You may notice that based on all songs, the rating is almost always very low. So, think of the best way to convert song count to ratings. (Hint: Try generating ratings based on each user's song play history)

### Putting songs and # of listens into lists and then grouping by user ID

In [8]:
#print(name_replaced_rdd.take(5))

# this RDD groups each song and # of listens into a list
combine_song_plays_rdd = name_replaced_rdd.map(lambda x: [x[0], [x[1], x[2]]])
# this RDD groups all rows by user ID resulting in each row being a unique user
# each unique user includes a list of lists where each sublist is a song and the number of times the user listened to it
grouped_by_user_rdd = combine_song_plays_rdd.groupByKey().mapValues(list)
print(grouped_by_user_rdd.take(2))

[Stage 37:>                                                         (0 + 1) / 1]

[('62543798a0eac1d057af421b188dc72d6fd48ff2', [['10', '1'], ['10827', '1'], ['148533', '1'], ['160828', '1'], ['190514', '1'], ['199981', '1'], ['253699', '1'], ['272391', '2'], ['293616', '1'], ['296902', '1'], ['301690', '1'], ['338648', '5'], ['342111', '1'], ['353019', '1'], ['369289', '2'], ['42181', '3'], ['71024', '1'], ['93102', '1']]), ('05b340468f70fee0b924b9163a26c75b39ec640b', [['10', '1'], ['11011', '1'], ['150661', '2'], ['15472', '1'], ['183190', '70'], ['211749', '1'], ['221730', '2'], ['268474', '2'], ['281851', '1'], ['284163', '1'], ['318024', '1'], ['322080', '1'], ['343663', '1'], ['353679', '2'], ['354992', '1'], ['363932', '1'], ['369289', '6'], ['37952', '1'], ['40311', '7'], ['74238', '1']])]


                                                                                

### Calculating relative song rating for each user

In [9]:
# this function calculates the user popularity for each row within the grouped_by_user RDD
# it first goes through and calculates the total # of listens for a given user
# it then goes through each song and calculates relative rating as (# of listens) / (total # of listens)
def calc_user_pop(row):
    sum = 0
    for i in row:
        sum += int(i[1])

    new_song_list = []

    for i in range(len(row)):
        song_pop = float(format(int(row[i][1])/sum, ".3"))
        new_song_list.append([int(row[i][0]), song_pop])

    return(new_song_list)


user_pop_rdd = grouped_by_user_rdd.map(lambda x: [x[0], calc_user_pop(x[1])])

## Step 3: 
For a given user_id (choose one by yourselves), rating, recommend 5 other songs from the list. One way to do this is based on another user who liked the same song liked by this user with rating more than the given rating and recommend the 5 songs based on the matched user's rating. 

In [10]:
# I chose the most popular song for the given user and used their relative rating for that song as the threshold
given_user = '62543798a0eac1d057af421b188dc72d6fd48ff2'
given_song = 338648
given_rating = 0.192

### Finding all other users who liked the given song (not including the given user)

In [11]:
# this function returns all users who listened to the given song and rated it higher than the given rating
def find_users_who_liked_song(row):
    contains_song_bool = False
    for i in row[1]:
        # i = [song, pop]
        if (str(i[0]) == str(given_song)) & (float(i[1])>float(given_rating)):
            #print(i[1]) # highest pop for 338648 = 0.562
            contains_song_bool = True
    
    return contains_song_bool

# this function filters out the given user from the list of users who liked the given song    
def not_given_user(row):
    if row[0] == given_user:
        return False
    else:
        return True


users_who_liked_song_rdd = user_pop_rdd.filter(find_users_who_liked_song)
uwls_minus_given_user = users_who_liked_song_rdd.filter(not_given_user)

print(uwls_minus_given_user.count()) # should be 19 for user = '62543798a0eac1d057af421b188dc72d6fd48ff2', song = 338648, rating = 0.192

[Stage 40:>                                                         (0 + 3) / 3]

19


                                                                                

### Finding the rows relating to the given song for each of the users found above

In [12]:
# this function reformats the RDD into the format [ [song, rating], user ] so that each song/rating pair is in its own list
def add_user_to_song_list(row):
    new_song_list = []
    for i in row[1]:
        #print(i)
        new_song_list.append([i, row[0]])

    return(new_song_list)

# this function finds only those rows that relate to the given song
def find_rows_with_given_song(row):
    if int(row[0][0]) == given_song:
        return True
    else:
        return False

# reformats RDD such that we have a list of lists where each sublist contains the song/rating pair and the associated user
add_user_to_song_list_rdd = uwls_minus_given_user.flatMap(add_user_to_song_list)
# filters the previously created RDD such that only rows relating to the given song are present
given_song_with_user_rdd = add_user_to_song_list_rdd.filter(find_rows_with_given_song)
# sort the previously created RDD by rating - this allows us to find the user who rated the given song the highest
sorted_by_rating = given_song_with_user_rdd.sortBy(lambda x: x[0][1], ascending=False)
print(sorted_by_rating.take(5))

[Stage 49:>                                                         (0 + 3) / 3]

[[[338648, 0.562], 'c44d6b25efcafe674a60e9073070bfc5ccf81a7b'], [[338648, 0.556], '172ab36adefde6f024c78062f5cd56a92d529fab'], [[338648, 0.542], '4bd779f7e52e2fbdbb67dc00644cb417ea0cdd36'], [[338648, 0.455], '291f5691468c10396b3b1e05e20958e0d9798a6f'], [[338648, 0.421], 'f0d5494bae08c6130c3363a218b68bba6baaeda8']]


                                                                                

### Finding other songs listened to by the user who rated the given song the highest

In [13]:
# this just grabs the username for the user who rated the given song the highest
user_with_highest_rating = sorted_by_rating.take(1)[0][1]
# this filters to retrieve only the songs listened to by the user who rated the given song the highest
highest_rated_user_rdd = uwls_minus_given_user.filter(lambda x: x[0]==user_with_highest_rating)
# this removes the user ID and sorts by the rating the user gave to each song they listened to
hru_songs_sorted_by_pop = highest_rated_user_rdd.flatMap(lambda x: x[1]).sortBy(lambda x: x[1], ascending=False)

                                                                                

In [14]:
# this filters out the given song from the list of songs listened to by the user who rated the given song the highest
hru_songs_minus_given_rdd = hru_songs_sorted_by_pop.filter(lambda x: int(x[0])!=given_song)
# this gives us the top five song/rating pairs for the user who rated the given song the highest
hru_top_five_minus_given = hru_songs_minus_given_rdd.take(5)

# the following code creates a list of just the song indexes for the top five songs
five_songs_to_recommend = []
for i in hru_top_five_minus_given:
    five_songs_to_recommend.append(i[0])

print(five_songs_to_recommend)

# these recommendation don't take into account the fact that the given user may have already listened to them
# unlike movies, however, most people listen to songs more than once, so I think it is reasonable to include songs that have already been listened to

[Stage 63:>                                                         (0 + 3) / 3]

[341056, 210533, 340684, 158979, 170370]


                                                                                

## Step 4: 
1. Compute cosine similarity between all pairs of users. 
2. Sort the similarity score and print the top-5 similar users. 
3. If the top-5 user set has an user appearing more than once, ignore that pair and take the next best pair from the sorted list. 
4. For a given user_id, identify the top-5 similar users and hence song recommendations from other user's list. 

### Importing Cosine Similarity Function from RecommenderSystems Notebook

In [15]:
import math

# function to calculate the dot product of two vectors
def dot(v,w):
    """ v_1*w_1 + ... v_n*w_n"""
    return sum(v_i *w_i
              for v_i,w_i in zip(v,w))

# function to calculate cosine similarity between two vectors
def cosine_similarity(v, w):
    return dot(v, w) / math.sqrt(dot(v, v) * dot(w, w))

# converting the user_pop and distinct_songs RDDs into lists for easier manipulation
user_pop_list = user_pop_rdd.collect()
distinct_songs_list = distinct_songs_rdd.collect()

print(user_pop_list[0:2]) # TO DO: create a dictionary for each user that maps their song to their rating

                                                                                

[['62543798a0eac1d057af421b188dc72d6fd48ff2', [[10, 0.0385], [10827, 0.0385], [148533, 0.0385], [160828, 0.0385], [190514, 0.0385], [199981, 0.0385], [253699, 0.0385], [272391, 0.0769], [293616, 0.0385], [296902, 0.0385], [301690, 0.0385], [338648, 0.192], [342111, 0.0385], [353019, 0.0385], [369289, 0.0769], [42181, 0.115], [71024, 0.0385], [93102, 0.0385]]], ['05b340468f70fee0b924b9163a26c75b39ec640b', [[10, 0.00962], [11011, 0.00962], [150661, 0.0192], [15472, 0.00962], [183190, 0.673], [211749, 0.00962], [221730, 0.0192], [268474, 0.0192], [281851, 0.00962], [284163, 0.00962], [318024, 0.00962], [322080, 0.00962], [343663, 0.00962], [353679, 0.0192], [354992, 0.00962], [363932, 0.00962], [369289, 0.0577], [37952, 0.00962], [40311, 0.0673], [74238, 0.00962]]]]


### Creating an RDD with each user and the songs in their listening history

In [16]:
# this function remaps the user_pop_rdd to be in the following format: [[user, [song1, song2, song3]] [user2, [song1, song2]]]
def songs_only(row):
    song_list = []
    for rating in row[1]:
        song_list.append(rating[0]) 

    return song_list

# remapping the user_pop_rdd and creating a list of lists where each sublist contains the user followed by a list of all songs they listened to
user_songs_only = user_pop_rdd.map(lambda x: [x[0], songs_only(x)])
user_songs_only_list = user_songs_only.collect()

                                                                                

### Step 4 Part 1: Calculating Cosine Similarity for Users

In [30]:
import random

# taking a random sample of k users because my computer is unable to create the required matrices for the entire dataset
user_sample = random.sample(user_songs_only_list, 100)

# function to create an interest matrix for each song for each user (within the user_sample)
# this function loops through every user in the user_sample and compares against every song in the distinct songs list
# a 1 value is assigned if the song is in the user's listening history and a 0 is assigned otherwise
# DESIRED UPDATE: rather than using binary system (0 or 1), use the user's relative popularity for the song in question
def create_song_interest_matrix(user_song_interests):
    user_song_interest_matrix = []

    for user_song in user_song_interests:
        ind_user_matrix = [1 if song in user_song[1] else 0
                            for song in distinct_songs_list]

        user_song_interest_matrix.append(ind_user_matrix)

    return user_song_interest_matrix
        
# creating the song interest matrix for the user_sample
user_song_interest_matrix = create_song_interest_matrix(user_sample)
print('sucessfully created user song interest matrix') # provided because the code can take a while to run

# create a matrix of user cosine similarities for each user in the user_song_interest_matrix
user_similarities = [[cosine_similarity(interest_vector_i, interest_vector_j)
                      for interest_vector_j in user_song_interest_matrix]
                     for interest_vector_i in user_song_interest_matrix]

print('successfully created user_similarities matrix') # provided because the code can take a while to run

### TO DO ###
## INCORPORATE RELATIVE POPULARITY CALCULATIONS 
## add the relative popularity back into the user list and then rather than using a binary value, use the relative popularity 
## if a song is not in the user's history, it will be 0. if the song is in the user's history, it will be = to the song's relative popularity

sucessfully created user song interest matrix
successfully created user_similarities matrix


### Step 4 Part 2 & 3: Sorting Similarity Score and Finding Most Similar Users (Not Including Repeated Users)

In [31]:
similar_users_list = []

# the below for loop creates a list of every user similarity that is not = 0 or 1 (b/c if =1 then its comparing user to self)
# i and j represent the index value for the user and the user we are comparing to respectively - this allows us to find which users are most similar
i = 0
for user in user_similarities:
    # user index value = i
    j = 0
    for similarity in user:
        if (similarity != 0) & (similarity != 1):
            similar_users_list.append([i, j, similarity])
        j += 1

    i += 1

# sort the similar users list by the similarity value
similar_users_list.sort(key = lambda x: x[2], reverse=True)

# code to get rid of repeated pairs which is not necessary so long as we use the below code for getting rid of any duplicated user
'''
for pair in similar_users_list:
    users_pair = [pair[0], pair[1]]
    users_pair.reverse()
    users_pair.append(pair[2])
    #print(users_pair)
    if users_pair in similar_users_list:
        similar_users_list.remove(users_pair)
'''

# the below for loop gets rid of any user who appears more than once
i = 1
for pair in similar_users_list:
    user0, user1 = pair[0], pair[1]
    for list_pair in similar_users_list[i:]:
        if (user0 in list_pair) or (user1 in list_pair):
            similar_users_list.remove(list_pair)
    i += 1


print("Top five most similar users and associated similarity score:", similar_users_list[0:5])


Top five most similar users and associated similarity score: [[3, 77, 0.2886751345948129], [33, 97, 0.21650635094610968], [8, 69, 0.21081851067789195], [4, 10, 0.18257418583505536], [61, 99, 0.18257418583505536]]


### Step 4 Part 4: Printing Most Similar Users for a Given User

In [34]:
# creating a list of usernames within the user_sample
username_list = []
for user in user_sample:
   username_list.append(user[0])

# generating a list of keys to be used as indexes
key_list = list(range(len(username_list)))

# this dictionary uses the user ID as they key and the index as the value - this allows us to use user IDs to assess similarity
username_dict = {username_list[i]: key_list[i] for i in range(len(key_list))}
username_dict_reverse = {key_list[i]: username_list[i] for i in range(len(key_list))}

# this code was copied from the RecommenderSystems notebook
# given a user ID, this function finds all nonzero similarity scores for the given user compared against all other users
def most_similar_users_to(user_id):
    pairs = [(other_user_id, similarity)              
             for other_user_id, similarity in         
                enumerate(user_similarities[username_dict[user_id]]) 
             if username_dict[user_id] != other_user_id and similarity > 0] 

    # this returns a list of similarities sorted from most similar to least similar
    return sorted(pairs,                              
                  key=lambda pair: pair[1],           
                  reverse=True)                       

user_to_compare = random.sample(user_sample, 1)
utc_most_similar_users = most_similar_users_to(str(user_to_compare[0][0]))

if len(utc_most_similar_users) > 5:
   utc_most_similar_users = utc_most_similar_users[0:5]

print(utc_most_similar_users) # you may have to rerun this cell in case the user_to_compare doesn't have any similar users

[(61, 0.18257418583505536), (67, 0.13608276348795434), (98, 0.12309149097933272), (7, 0.10540925533894598), (47, 0.10206207261596577)]


### Recommending songs based on most similar users

In [35]:
# first things first: convert from index value back to user ID
similar_usernames = []
for pair in utc_most_similar_users:
    similar_usernames.append([username_dict_reverse[pair[0]], pair[1]])

# the for loops below creates a flattened RDD for each of the similar users mentioned above and then adds the user's top rated songs to a list
songs_to_rec = []
for pair in similar_usernames:
    # first filter finds the given user, the flatMap allows us to complete the subsequent sort by rating, and the final filter removes any rating < .1
    flattened_rdd = user_pop_rdd.filter(lambda x: x[0]==pair[0]).flatMap(add_user_to_song_list).sortBy(lambda x: x[0][1], ascending=False)\
                                .filter(lambda x: x[0][1]>.1)
    
    for song in flattened_rdd.collect():
        songs_to_rec.append(song[0])

print(songs_to_rec)


[Stage 177:>                                                        (0 + 3) / 3]

[[223714, 0.254], [245588, 0.237], [335468, 0.203], [215861, 0.186], [282298, 0.119], [14397, 0.15], [282116, 0.15], [310215, 0.15], [368622, 0.272], [181765, 0.136], [280834, 0.136], [349184, 0.111], [14397, 0.251], [357810, 0.247], [88544, 0.24], [207699, 0.135], [218666, 0.135], [250188, 0.135], [362415, 0.135], [379742, 0.135]]


                                                                                

In [36]:
# we then sort by rating for songs from all users
songs_to_rec.sort(key= lambda x: x[1], reverse=True)

final_recommendation = []

# our final recommendations are the top 5 rated songs from the most similar users
for pair in songs_to_rec[0:5]:
    final_recommendation.append(pair[0])

print(final_recommendation)

[368622, 223714, 14397, 357810, 88544]


#### The End