This project is based on Map-Reduce Framework. In these you will get to work with Spark and will get to know how 
does spark work, what functionalities does spark provide, what does map-reduce framework do and why is it useful. 

In this project you will be implementing a basic song recommender system. You will be given a dataset where there are multiple csv files. These csv files have data corresponding to song play count and song information.

The data you would be using will be provided in a zip file along with this notebook. The __msd.zip__ archive contains:
1. **'kaggle_visible_evaluation_triplets.txt'**. We will be using the visible part of the testing data to understand the working on Apache Spark.  The user's listening history is provided as: (user, song, play count).  
2. In **'kaggle_songs.txt'** file, each song is marked using an index for easier representation of songs.  
3. And **'kaggle_users.txt'** file is the canonical list of user identifiers.
4. Take **'MSDChallengeGettingstarted.pdf'** as your reference.


This project consists of 4 questions:  

1. Create an RDD with _kaggle_visible_evaluation_triplets.txt_ and replace the song name with the song index from _kaggle_songs.txt_. Identify the number of songs that do not have any rating. 
2. Generate song ratings based on the song play count as a normalized score between 0 and 1. 
3. Identify the popular song based on this rating and recommend songs to user, given user id based on the algorithm used in Movie recommender system from class. 
4. Using Cosine similarity function, identify pair-wise similarity between each pair of users and generate the top 5 most similar users without an overlap in users. 

The above list is the higer level idea about the questions. 

In [1]:
### Starter code ####
import findspark
findspark.init('C:\\apachespark')
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("Songs")
sc = SparkContext(conf = conf)
#### These lines are to tell jupyter where to find Apache Spark ####

In [2]:
#sc.stop()

In [3]:
## Read triplet file into RDD
triplet_rdd = sc.textFile(r"msd/kaggle_visible_evaluation_triplets.txt") \
    .map(lambda line: line.split("\t"))
#triplet_rdd.collect()

## Step 1: 
Replace song name with song index and identify the number of songs without user history

In [4]:
song_rdd = sc.textFile(r"msd/kaggle_songs.txt") \
    .map(lambda line: line.split(" "))

def noUserHistory(row):
    if row[1] is None:
        return True
    return False

# Rearranging the RDD
# Song title, User, Play count
replaced_rdd = triplet_rdd.map(lambda x: (x[1], [x[0], x[2]]))

# Getting all the songs that have been played into one rdd with the Song ID

# Song ID, [User, Play count]
user_history = replaced_rdd.join(song_rdd).map(lambda x : (x[1][1], [x[1][0][0], x[1][0][1]]))

# Rearranging the RDD
# Song ID, Song Title
song_rdd = song_rdd.map(lambda x : (x[1], x[0]))

# Combining all the information about the songs into one rdd
# A song without any play count by a user will be shown as None
# Song ID, User information
no_user_history = song_rdd.leftOuterJoin(user_history).map(lambda x : (x[0],x[1][1]))

#Filtering out all the songs that have been played before
no_user_history = no_user_history.filter(noUserHistory)
no_user_history.take(5)
no_user_history.count()

223007

## Step 2:
Generate song ratings based on the play_count. For example, if (song_1, 5; song_2, 10; song_3, 5) i.e., song_1 is played 5 times, song_2 is played 10 times and song_3 is played 5 times, the normalized rating score should be 0.25, 0.5 and 0.25 respectively. 
Similarly, generate the rating for all the songs. You may notice that based on all songs, the rating is almost always very low. So, think of the best way to convert song count to ratings. (Hint: Try generating ratings based on each user's song play history)

In [5]:
user_rdd = triplet_rdd.map(lambda x: (x[0], [x[1], x[2]]))

# Adding all the song play counts for a user
user_playcount_rdd = triplet_rdd.map(lambda x: (x[0], x[2]))
user_total_playcount_rdd = user_playcount_rdd.reduceByKey(lambda a, b : str(float(a) + float(b)))

# Adding all the play counts together for each song 
song_playcount_rdd = triplet_rdd.map(lambda x: (x[1], x[2]))
song_total_playcount_rdd = song_playcount_rdd.reduceByKey(lambda a, b : str(float(a) + float(b)))

# Placing the total user playcount in an rdd with the user and the play count for a song
user_rdd = user_rdd.join(user_total_playcount_rdd)
song_total_playcount_rdd.take(5)

[('SOBONKR12A58A7A7E0', '35432.0'),
 ('SOFLJQZ12A6D4FADA6', '7895.0'),
 ('SOHTKMO12AB01843B0', '10515.0'),
 ('SOXLOQG12AF72A2D55', '4671.0'),
 ('SOZPZGN12A8C135B45', '39.0')]

In [6]:
# Dividing each song's play count by the total user's play count
# User normalized rating for each song they have played
user_ratings_rdd = user_rdd.map(lambda x: (x[0], x[1][0][0], float(x[1][0][1])/float(x[1][1])))

# Adding all the user ratings together for each song
song_rating = user_ratings_rdd.map(lambda x: (x[1], x[2]))
song_total_rating = song_rating.reduceByKey(lambda a, b : str(float(a) + float(b)))
# Adding the songs total playcount to the rdd
# Dividing the accumulated rating by the total playcount for each song to get the overall rating for the song

# Normalized Rating for each song
song_normalized_rating = song_total_rating.join(song_total_playcount_rdd).map(lambda x: (x[0], float(x[1][0])/float(x[1][1])))

song_normalized_rating.take(5)


[('SOPFVWP12A6D4FC636', 0.018355673620022512),
 ('SOACRJG12A8C137A8D', 0.044167506746288256),
 ('SOSOUKN12A8C13AB79', 0.011737182667946173),
 ('SOIOZHO12AB017FE5E', 0.016285321334254935),
 ('SOIVZJE12A8C13D9D8', 0.01528780047916821)]

## Step 3: 
For a given user_id (choose one by yourselves), rating, recommend 5 other songs from the list. One way to do this is based on another user who liked the same song liked by this user with rating more than the given rating and recommend the 5 songs based on the matched user's rating. 

In [7]:
user_ratings_rdd.take(3)

[('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOUVUHC12A67020E3B',
  0.058823529411764705),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOUQERE12A58A75633',
  0.058823529411764705),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOIPJAX12A8C141A2D',
  0.058823529411764705)]

In [8]:

## Given an user id, a song id, and the user's rating for that song we found songs 
## to recommend 5 other songs based on other user's ratings
user_id = 'd7083f5e1d50c264277d624340edaaf3dc16095b'
song = 'SOUVUHC12A67020E3B'
rating = 0.058823529411764705

def liked(row):
    if((row[1] == song) and (float(row[2]) >= rating)):
        return True
    return False

## Get all users who liked the song and what rating they gave it
## above the like threshold
## Using the User Normalized Rating RDD to compare each user's rating of the song for the chosen song.
users_same_song = user_ratings_rdd.filter(liked).map(lambda x: (x[0],x[2]))
#users_same_song.take(5)

## Gettings all the users and their info that liked the same song
## (Song, [userId, rating])
## take.() just gathers a few of the rows rather than the entire dataset
userSongs = user_ratings_rdd.join(users_same_song).map(lambda x: (x[1][0], [x[0], x[1][1]]))
userSongs.take(20)

[('SOUVUHC12A67020E3B',
  ['d7083f5e1d50c264277d624340edaaf3dc16095b', 0.058823529411764705]),
 ('SOUQERE12A58A75633',
  ['d7083f5e1d50c264277d624340edaaf3dc16095b', 0.058823529411764705]),
 ('SOIPJAX12A8C141A2D',
  ['d7083f5e1d50c264277d624340edaaf3dc16095b', 0.058823529411764705]),
 ('SOEFCDJ12AB0185FA0',
  ['d7083f5e1d50c264277d624340edaaf3dc16095b', 0.058823529411764705]),
 ('SOATCSU12A8C13393A',
  ['d7083f5e1d50c264277d624340edaaf3dc16095b', 0.058823529411764705]),
 ('SOZPZGN12A8C135B45',
  ['d7083f5e1d50c264277d624340edaaf3dc16095b', 0.058823529411764705]),
 ('SOPFVWP12A6D4FC636',
  ['d7083f5e1d50c264277d624340edaaf3dc16095b', 0.058823529411764705]),
 ('SOHEKND12A8AE481D0',
  ['d7083f5e1d50c264277d624340edaaf3dc16095b', 0.058823529411764705]),
 ('SOPSVVG12A8C13B444',
  ['d7083f5e1d50c264277d624340edaaf3dc16095b', 0.058823529411764705]),
 ('SODSKZZ12AB0188524',
  ['d7083f5e1d50c264277d624340edaaf3dc16095b', 0.058823529411764705]),
 ('SONZTNP12A8C1321DF',
  ['d7083f5e1d50c264277d62

In [9]:
def notTheGivenSong(row):
    if(row[0] != song):
        return True
    return False

# Gettings all songs and then accumulating they together to get the total ratings for the songs.
# Making sure not to grab the selected song
usersTotalRatings = userSongs.map(lambda x: (x[0], x[1][1])) \
                                .filter(notTheGivenSong) \
                                .reduceByKey(lambda accum, n: float(accum) + float(n))
usersTotalRatings.take(5)


[('SOPFVWP12A6D4FC636', 0.058823529411764705),
 ('SODSKZZ12AB0188524', 0.058823529411764705),
 ('SOCAFDI12A8C13D10E', 0.07954545454545454),
 ('SOLOHAI12A8C143976', 0.07954545454545454),
 ('SOFKABN12A8AE476C6', 0.125)]

In [10]:
## Grabbing the 5 highest accumulated ratings for the songs
recommended_songs = usersTotalRatings.takeOrdered(5, key=lambda x: -x[1])
recommended_songs

[('SOFAONV12A67020E43', 1.2715468391321159),
 ('SOURJIK12A8C138182', 0.8225715889949468),
 ('SONVPTP12A6D4F7A34', 0.8225715889949468),
 ('SOCDRXW12A6D4FA266', 0.6774193548387096),
 ('SOJQUPU12A8C1387AB', 0.6774193548387096)]

## Step 4: 
1. Compute cosine similarity between all pairs of users. 
2. Sort the similarity score and print the top-5 similar users. 
3. If the top-5 user set has an user appearing more than once, ignore that pair and take the next best pair from the sorted list. 
4. For a given user_id, identify the top-5 similar users and hence song recommendations from other user's list. 

In [11]:
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

# Selecting the given user for part4
user_id = 'd7083f5e1d50c264277d624340edaaf3dc16095b'

# Rearranging and grouping each user with their song and user ratings
user_ratings_grouped = user_ratings_rdd.map(lambda x: (x[0], [x[1], x[2]]))
users = user_ratings_grouped.groupByKey().map(lambda x : (x[0], list(x[1])))
# Converting the rdd into a list to compute the cosine similarity while also decreasing the rows to 1000
# We are able to run 10000 rows but it takes a long time iterating through the list for each user
users=users.take(1000)


In [12]:
# Calculating the cosine similarity for each user compared to all the other users
overall_user_similarity = []
for i in range(0,len(users)):
    selected_users_similarity=[]
    for j in range(0,len(users)):
        list1=users[i][1]
        list2=users[j][1]

        dict1={}
        for song in list1:
            dict1[song[0]] = []
            dict1[song[0]].append(song[1])
            dict1[song[0]].append(0)
        for song in list2:
            if song[0] not in dict1:
                dict1[song[0]] = []
                dict1[song[0]].append(0)
                dict1[song[0]].append(song[1])
            else:
                dict1[song[0]][1] = song[1]
        dict1

        list3 = []
        list4 = []
        for pair in dict1.values():
            list3.append(pair[0])
            list4.append(pair[1])
        #calculateCosines
        cos = cosine_similarity([list3, list4])
        if i != j:
            selected_users_similarity.append([users[j][0], cos[0][1]])
    overall_user_similarity.append([users[i][0], selected_users_similarity])

In [13]:
# Part 1
# The cosine similarity for all the users compared to each other.
# This shows the coseine similarity but the list slows everything down so made it into an rdd to display the information
cos_similarity=sc.parallelize(overall_user_similarity)
cos_similarity.take(5)

[['d7083f5e1d50c264277d624340edaaf3dc16095b',
  [['d68dc6fc25248234590d7668a11e3335534ae4b4', 0.0],
   ['fdf6afb5daefb42774617cf223475c6013969724', 0.0],
   ['10cbcd627472477dfbec90fb75017f8df6ce84ec', 0.0],
   ['3a613180775197cd08c154abe4e3f67af238a632', 0.0],
   ['6530c4fc41b9110de5d39fe0355fa103c66385f0', 0.0],
   ['47bf07bcb932cf88175ba3eb218401f9fa15fe6b', 0.0],
   ['5a68f7886f7e778490c6f13807039ff4152bcd62', 0.0],
   ['096bf9613f87c98a8fec4f8d5ab0b9e8c9d0cb14', 0.0],
   ['6206b2c8aba34ae7cff6113eae05e4c69ef034c7', 0.0],
   ['6493c305190b52657d4ea3f4adf367ffcf3427af', 0.0],
   ['0bc0faf3674077d7cf66fd3e2112d774811ba416', 0.0],
   ['202c63cd3568680561e84d33bc35740d662efccf', 0.0],
   ['baf2fe5885ab93fbbdb7fecc6691788e70afb6c8', 0.0],
   ['8b0d2429e35ccef87639553f11bcc1ecfe8fe5a8', 0.0],
   ['1dd826ef46cea87a22be720e22a518c9bfecf3e2', 0.0],
   ['766369a79641ed084b8e6c1e1096dde15eed0fc1', 0.0],
   ['fb370fdad515525ba15cc02e1ddd9243002aa547', 0.0],
   ['02ba3b9d56ac7a6b971f638806062fb

In [14]:
# Adding the cosine similarites together to find the most similar users
aggregrate_similar_scores=[]
for i in range(0,len(overall_user_similarity)):
    aggregrated_score = 0
    for j in range(0,len(overall_user_similarity[i][1])):
        aggregrated_score = aggregrated_score + overall_user_similarity[i][1][j][1]
    aggregrate_similar_scores.append([overall_user_similarity[i][0], aggregrated_score])

In [15]:
# Part 2 and 3
# Converting it back into an rdd to find 5 highest aggregrate cosine similarities
overall_cosine_similarity = sc.parallelize(aggregrate_similar_scores)
overall_cosine_similarity =overall_cosine_similarity
TopSimilarUsers = overall_cosine_similarity.takeOrdered(5, key=lambda x: -x[1])
TopSimilarUsers

[['3dcc24e0904e9134b581e8e93fd1efe6c23369ed', 19.153472146692437],
 ['4a16fd8943913c0268b360bd12f37a4736c2b897', 17.71093762632823],
 ['18b358e47175902b9f004ac3b5d0fa52a3704f81', 17.35138545917773],
 ['8683a5e3cf979bb0a54d5b721873e20a23abfb60', 16.992740377579054],
 ['7d7e33a519caef8dbb363abe00b4e247cf19da1b', 16.90407269590618]]

In [16]:
given_users_similarity=[]
for i in range(0,len(users)):
    if users[i][0]==user_id:
        user_spot=i
        break
user_spot

0

In [17]:
# Part 4
# Finding the cosine similarity between the selected user and all the other users
given_users_similarity=[]
for i in range(0,len(users)):
    if users[i][0]==user_id:
        user_spot=i
        break

for j in range(0,len(users)):
    list1=users[user_spot][1]
    list2=users[j][1]

    dict1={}
    for song in list1:
        dict1[song[0]] = []
        dict1[song[0]].append(song[1])
        dict1[song[0]].append(0)
    for song in list2:
        if song[0] not in dict1:
            dict1[song[0]] = []
            dict1[song[0]].append(0)
            dict1[song[0]].append(song[1])
        else:
            dict1[song[0]][1] = song[1]
    dict1

    list3 = []
    list4 = []
    for pair in dict1.values():
        list3.append(pair[0])
        list4.append(pair[1])
    #calculateCosines
    cos = cosine_similarity([list3, list4])

    given_users_similarity.append([users[j][0], cos[0][1]])

In [18]:
# Selected User's cosine similarity between all the other users
given_users_similarity

[['d7083f5e1d50c264277d624340edaaf3dc16095b', 1.0000000000000004],
 ['d68dc6fc25248234590d7668a11e3335534ae4b4', 0.0],
 ['fdf6afb5daefb42774617cf223475c6013969724', 0.0],
 ['10cbcd627472477dfbec90fb75017f8df6ce84ec', 0.0],
 ['3a613180775197cd08c154abe4e3f67af238a632', 0.0],
 ['6530c4fc41b9110de5d39fe0355fa103c66385f0', 0.0],
 ['47bf07bcb932cf88175ba3eb218401f9fa15fe6b', 0.0],
 ['5a68f7886f7e778490c6f13807039ff4152bcd62', 0.0],
 ['096bf9613f87c98a8fec4f8d5ab0b9e8c9d0cb14', 0.0],
 ['6206b2c8aba34ae7cff6113eae05e4c69ef034c7', 0.0],
 ['6493c305190b52657d4ea3f4adf367ffcf3427af', 0.0],
 ['0bc0faf3674077d7cf66fd3e2112d774811ba416', 0.0],
 ['202c63cd3568680561e84d33bc35740d662efccf', 0.0],
 ['baf2fe5885ab93fbbdb7fecc6691788e70afb6c8', 0.0],
 ['8b0d2429e35ccef87639553f11bcc1ecfe8fe5a8', 0.0],
 ['1dd826ef46cea87a22be720e22a518c9bfecf3e2', 0.0],
 ['766369a79641ed084b8e6c1e1096dde15eed0fc1', 0.0],
 ['fb370fdad515525ba15cc02e1ddd9243002aa547', 0.0],
 ['02ba3b9d56ac7a6b971f638806062fb1e4eac237', 0.0

In [19]:
# Converting it back into an rdd
# Filtering out the selected user 
# Finding 5 highest cosine similarities

def notTheGivenUser(row):
    if(row[0] != user_id):
        return True
    return False

user_cosine_similarity = sc.parallelize(given_users_similarity)
top5Users = user_cosine_similarity.filter(notTheGivenUser).takeOrdered(5, key=lambda x: -x[1])
top5Users

[['89ab5e0d2fd998440508fff198258c39fba2e5aa', 0.09626335170804942],
 ['c001cd4c088f88fee09d81961aac1da57877630f', 0.05923488777590925],
 ['cb208fb4a8f795cae329e805b924aefdb9519365', 0.04188539082916955],
 ['b50d6ee5b4e10aa8a8d1124d499b66f9464ce55f', 0.011357771260606367],
 ['d68dc6fc25248234590d7668a11e3335534ae4b4', 0.0]]