# <p style="text-align: center;">MIS 285N: Big Data and Distributed Programming</p>
# <p style="text-align: center;">Project - 1 : Apache Spark</p>
## <p style="text-align: center;">Instructor: Dr. Ramesh Yerraballi</p>
## <p style="text-align: center;">Due: Tuesday, September 14th submitted via Canvas by 11:59 pm</p>

Your work should be written in a **Jupyter notebook**.   

Also, please make sure your code runs in your notebook before submitting.

**Note:**

This project is based on Map-Reduce Framework. In these you will get to work with Spark and will get to know how 
does spark work, what functionalities does spark provide, what does map-reduce framework do and why is it useful. 

In this project you will be implementing a basic song recommender system. You will be given a dataset where there are multiple csv files. These csv files have data corresponding to song play count and song information.

The data you would be using will be provided in a zip file along with this notebook. The __msd.zip__ archive contains:
1. **'kaggle_visible_evaluation_triplets.txt'**. We will be using the visible part of the testing data to understand the working on Apache Spark.  The user's listening history is provided as: (user, song, play count).  
2. In **'kaggle_songs.txt'** file, each song is marked using an index for easier representation of songs.  
3. And **'kaggle_users.txt'** file is the canonical list of user identifiers.
4. Take **'MSDChallengeGettingstarted.pdf'** as your reference.



### **What to turn in?**  

A zip folder which will have:
1. Jupyter Notebook
2. A brief report in PDF format on what features you used for recommendation. And a brief explanation of flow of your code. For example,  what RDD does what or, why it was created.
3. datasets folder with the csv files you are using in your notebook.
4. Notebook should use relative path to the csv files in datasets folder.
5. Name of the zip folder - `<your_name>_<your_partner_name>.zip`

This project consists of 4 questions:  

1. Create an RDD with _kaggle_visible_evaluation_triplets.txt_ and replace the song name with the song index from _kaggle_songs.txt_. Identify the number of songs that do not have any rating. 
2. Generate song ratings based on the song play count as a normalized score between 0 and 1. 
3. Identify the popular song based on this rating and recommend songs to user, given user id based on the algorithm used in Movie recommender system from class. 
4. Using Cosine similarity function, identify pair-wise similarity between each pair of users and generate the top 5 most similar users without an overlap in users. 

The above list is the higer level idea about the questions. 

In [13]:
### Starter code ####
import findspark
import pandas as pd
findspark.init('C:\\apachestark')
from pyspark import SparkConf, SparkContext
#from pyspark.sql import SparkSession
conf = SparkConf().setMaster("local[*]").setAppName("Songs")
sc = SparkContext(conf = conf)
#### These lines are to tell jupyter where to find Apache Spark ####

In [12]:
#sc.stop()

In [15]:
## Read triplet file into RDD
triplet_rdd = sc.textFile(r"kaggle_visible_evaluation_triplets.txt") \
    .map(lambda line: line.split("\t")) 

triplet_rdd1 = triplet_rdd
kaggle_songs_rdd = sc.textFile(r"kaggle_songs.txt") \
    .map(lambda line: line.split(" ")) 
kaggle_songs_rdd.take(5)


[['SOAAADD12AB018A9DD', '1'],
 ['SOAAADE12A6D4F80CC', '2'],
 ['SOAAADF12A8C13DF62', '3'],
 ['SOAAADZ12A8C1334FB', '4'],
 ['SOAAAFI12A6D4F9C66', '5']]

## Step 1: 
Replace song name with song index and identify the number of songs without user history

In [16]:
rdd1=triplet_rdd.map(lambda x:(x[1],(x[0],x[2])))
print(rdd1.take(5))
rdd3 = rdd1.join(kaggle_songs_rdd)
print(rdd3.take(5))
rdd4 = rdd3.map(lambda x: (x[1][0][0], x[1][1], x[1][0][1]))
rdd4.take(5)

[('SOBONKR12A58A7A7E0', ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1')), ('SOEGIYH12A6D4FC0E3', ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1')), ('SOFLJQZ12A6D4FADA6', ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1')), ('SOHTKMO12AB01843B0', ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1')), ('SODQZCY12A6D4F9D11', ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1'))]
[('SOBONKR12A58A7A7E0', (('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1'), '25150')), ('SOBONKR12A58A7A7E0', (('c34670d9c1718361feb93068a853cead3c95b76a', '1'), '25150')), ('SOBONKR12A58A7A7E0', (('c5006d9f41f68ccccbf5ee29212b6af494110c5e', '1'), '25150')), ('SOBONKR12A58A7A7E0', (('e4332e11f4df6dd26673bb6b085e9a2bbdc9b8a5', '2'), '25150')), ('SOBONKR12A58A7A7E0', (('baf2fe5885ab93fbbdb7fecc6691788e70afb6c8', '4'), '25150'))]


[('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '25150', '1'),
 ('c34670d9c1718361feb93068a853cead3c95b76a', '25150', '1'),
 ('c5006d9f41f68ccccbf5ee29212b6af494110c5e', '25150', '1'),
 ('e4332e11f4df6dd26673bb6b085e9a2bbdc9b8a5', '25150', '2'),
 ('baf2fe5885ab93fbbdb7fecc6691788e70afb6c8', '25150', '4')]

In [17]:
countt = rdd4.groupBy(lambda x: (x[1])).count()
print(kaggle_songs_rdd.count() - countt )
print(rdd4.take(5))

223007
[('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '25150', '1'), ('c34670d9c1718361feb93068a853cead3c95b76a', '25150', '1'), ('c5006d9f41f68ccccbf5ee29212b6af494110c5e', '25150', '1'), ('e4332e11f4df6dd26673bb6b085e9a2bbdc9b8a5', '25150', '2'), ('baf2fe5885ab93fbbdb7fecc6691788e70afb6c8', '25150', '4')]


## Step 2:
Generate song ratings based on the play_count. For example, if (song_1, 5; song_2, 10; song_3, 5) i.e., song_1 is played 5 times, song_2 is played 10 times and song_3 is played 5 times, the normalized rating score should be 0.25, 0.5 and 0.25 respectively. 
Similarly, generate the rating for all the songs. You may notice that based on all songs, the rating is almost always very low. So, think of the best way to convert song count to ratings. (Hint: Try generating ratings based on each user's song play history)

In [18]:
# Make song name the key 
triplet_map = triplet_rdd.map(lambda x : (x[1], [x[0], x[2]]))
triplet_map.take(5)

[('SOBONKR12A58A7A7E0', ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1']),
 ('SOEGIYH12A6D4FC0E3', ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1']),
 ('SOFLJQZ12A6D4FADA6', ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1']),
 ('SOHTKMO12AB01843B0', ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1']),
 ('SODQZCY12A6D4F9D11', ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1'])]

In [19]:
total_play_count = triplet_map.map(lambda x:int(x[1][1])).sum()
play_count_overall = triplet_map.reduceByKey(lambda x, y: x[1]+y[1])
print(play_count_overall.take(4))
play_count_overall_1 = play_count_overall.mapValues(lambda x: int(x[1])/total_play_count)
print(play_count_overall.take(4))
print(play_count_overall_1.take(4))

[('SOBONKR12A58A7A7E0', '11'), ('SOFLJQZ12A6D4FADA6', '11'), ('SOHTKMO12AB01843B0', '51'), ('SOXLOQG12AF72A2D55', '15')]
[('SOBONKR12A58A7A7E0', '11'), ('SOFLJQZ12A6D4FADA6', '11'), ('SOHTKMO12AB01843B0', '51'), ('SOXLOQG12AF72A2D55', '15')]
[('SOBONKR12A58A7A7E0', 2.1624707525830714e-07), ('SOFLJQZ12A6D4FADA6', 2.1624707525830714e-07), ('SOHTKMO12AB01843B0', 2.1624707525830714e-07), ('SOXLOQG12AF72A2D55', 1.0812353762915357e-06)]


In [20]:
rdd5 = rdd4.map(lambda x:(x[0],(x[1],int(x[2])))) # username, song id, int(count)
print(f' t5 {rdd5.take(5)}')
rdd6 = rdd5.map(lambda x: (x[0], x[1][1])) # usernae, count
print(f' t6 {rdd6.take(5)}')
#print(t6.take(5))
rdd7 =rdd6.groupByKey().map(lambda x: (x[0],sum(list(x[1])))) # username , sum(total count)
print(f' t7 {rdd7.take(5)}')
#print(t7.take(5))
rdd8 = rdd7.join(rdd5) # username , total count , song id , songid_count
print(f' t8 {rdd8.take(5)}')
#print(t8.take(5))
rdd9 = rdd8.map(lambda x:( x[0] , x[1][1][0], x[1][1][1]/x[1][0])) # user info, song id, user's rating for the song
print(f' t9 {rdd9.take(5)}')
#print(t9.take(5))
rdd10 = rdd9.map(lambda x: (x[1], x[2])) # to get song id, rating
print(f' t10 {rdd10.take(5)}')
#print(t10.take(5))
rdd11 = rdd10.groupByKey().map(lambda x:(x[0], sum(list(x[1]))/len(list(x[1]))))
print(f' t11 {rdd11.take(5)}')
rdd11.take(5)

 t5 [('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', ('25150', 1)), ('c34670d9c1718361feb93068a853cead3c95b76a', ('25150', 1)), ('c5006d9f41f68ccccbf5ee29212b6af494110c5e', ('25150', 1)), ('e4332e11f4df6dd26673bb6b085e9a2bbdc9b8a5', ('25150', 2)), ('baf2fe5885ab93fbbdb7fecc6691788e70afb6c8', ('25150', 4))]
 t6 [('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 1), ('c34670d9c1718361feb93068a853cead3c95b76a', 1), ('c5006d9f41f68ccccbf5ee29212b6af494110c5e', 1), ('e4332e11f4df6dd26673bb6b085e9a2bbdc9b8a5', 2), ('baf2fe5885ab93fbbdb7fecc6691788e70afb6c8', 4)]
 t7 [('e4332e11f4df6dd26673bb6b085e9a2bbdc9b8a5', 54), ('f6e34f0a68d5ea1344511e33486f956de361db78', 219), ('bcb1e6d620cf522390d5c92bae26936928e0b588', 56), ('ed199f27a41066e37414c3fe9eefb2ae372b8819', 24), ('c1d24ce8cd80e40aa8d803d5ddfceb91a6b5d75d', 15)]
 t8 [('bcb1e6d620cf522390d5c92bae26936928e0b588', (56, ('25150', 26))), ('bcb1e6d620cf522390d5c92bae26936928e0b588', (56, ('177172', 1))), ('bcb1e6d620cf522390d5c92bae26936928e0b588', (56, 

[('98924', 0.09733013669292478),
 ('302369', 0.07242217163496932),
 ('170536', 0.18490677612062836),
 ('183796', 0.06976218180987774),
 ('257058', 0.05231647594438291)]

## Step 3: 
For a given user_id (choose one by yourselves), rating, recommend 5 other songs from the list. One way to do this is based on another user who liked the same song liked by this user with rating more than the given rating and recommend the 5 songs based on the matched user's rating. 

In [22]:
def recommendation(user_id): 
    ### get user liked data
    
    
    
    user_liked_data = rdd9.filter(lambda x :x[0]==user_id )
    print(user_liked_data.take(5))
    ## get user liked songs and ratings
    
    user_liked_songs =user_liked_data.map(lambda x : (x[1], x[2]))
    print(user_liked_songs.take(5))
    # Sort this data
    
    user_liked_songs_sorted = user_liked_songs.sortBy(lambda x : -x[1])
    print(user_liked_songs_sorted.take(5))
    # User favourite song
    
    fav_song = user_liked_songs_sorted.take(1)[0][0]
    #print(user_liked_songs_sorted.collect())
    print("fav song")
    print(fav_song)
    
    # Get users liked songs
    
    main_user_liked_songs = user_liked_data.map(lambda x: x[1]).collect()
    print("main_user_liked_songs")
    print(main_user_liked_songs)
    
    # Get users favourite song's rating
    fav_song_rating = user_liked_songs_sorted.take(1)[0][1]
    print("fav_song_rating")
    print(fav_song_rating)
    
    # Get similar users data who liked the main users fav song with a rating higher than the user gave
    similar_users = rdd9.filter(lambda x: x[1] == fav_song and x[2]>= fav_song_rating).map(lambda x : x[0])
    print("similar users ")
    #print(similar_users.collect())
    print(similar_users.take(3))
    similar_users_list = similar_users.collect()
    
    ## Get the similar users full data
    recommended_users_full_data = rdd9.filter(lambda x: x[0] in similar_users_list)
    print("recommended_users")
    #print(recommended_users.collect())
  
    recommended_users_full_data_sorted = recommended_users_full_data.sortBy(lambda x : -x[2])
    print("sorted recommended_users_sorted")
    print(recommended_users_full_data_sorted.take(10))
    
    recommended_users_new_songs = recommended_users_full_data_sorted.filter(lambda x : x[1] not in main_user_liked_songs)
    print("new similar users data without the already liked songs data ")
    print(recommended_users_new_songs.take(10))

    print("final recommended songs")
    recommendations = recommended_users_new_songs.map(lambda x : x[1]).take(5)
    
   
    print(recommendations)
    #user_liked_songs_sorted = sorted(user_liked_songs,key=lambda x: -x[1], reverse=True)
    #print(user_liked_songs.collect())
    #print(user_liked_songs_sorted.collect())
    #similar_users = 

    
users_rdd = sc.textFile(r"kaggle_users.txt").map(lambda line: line.split("\t"))
user_id = users_rdd.take(2)[1][0]

print(user_id)
recommendation('bcb1e6d620cf522390d5c92bae26936928e0b588')
print("-----")
print(rdd9.take(2))

d7083f5e1d50c264277d624340edaaf3dc16095b
[('bcb1e6d620cf522390d5c92bae26936928e0b588', '25150', 0.4642857142857143), ('bcb1e6d620cf522390d5c92bae26936928e0b588', '177172', 0.017857142857142856), ('bcb1e6d620cf522390d5c92bae26936928e0b588', '212753', 0.14285714285714285), ('bcb1e6d620cf522390d5c92bae26936928e0b588', '25890', 0.017857142857142856), ('bcb1e6d620cf522390d5c92bae26936928e0b588', '259912', 0.017857142857142856)]
[('25150', 0.4642857142857143), ('177172', 0.017857142857142856), ('212753', 0.14285714285714285), ('25890', 0.017857142857142856), ('259912', 0.017857142857142856)]
[('25150', 0.4642857142857143), ('314086', 0.21428571428571427), ('212753', 0.14285714285714285), ('225548', 0.08928571428571429), ('177172', 0.017857142857142856)]
fav song
25150
main_user_liked_songs
['25150', '177172', '212753', '25890', '259912', '314086', '334240', '105694', '225548']
fav_song_rating
0.4642857142857143
similar users 
['bcb1e6d620cf522390d5c92bae26936928e0b588', '03be4cbe2d71991995cf

## Step 4: 
1. Compute cosine similarity between all pairs of users. 
2. Sort the similarity score and print the top-5 similar users. 
3. If the top-5 user set has an user appearing more than once, ignore that pair and take the next best pair from the sorted list. 
4. For a given user_id, identify the top-5 similar users and hence song recommendations from other user's list. 

In [27]:
#### All songs and their count 

#
# user-based filtering


#1. Compute cosine similarity between all pairs of users. 


import numpy as np

from pyspark.sql import SparkSession
spark = SparkSession(sc)
def dot(v,w):
    """ v_1*w_1 + ... v_n*w_n"""
    return sum(v_i *w_i
              for v_i,w_i in zip(v,w))
def cosine_similarity(v, w):
    return dot(v, w) / math.sqrt(dot(v, v) * dot(w, w))




#unique_interests = kaggle_songs_rdd.map(lambda x : x[0]).take(400)

## We have taken a reduced version of the data named in file : reduced_data
triplet_rdd_reduced = sc.textFile(r"reduced_data.txt")\
                        .map(lambda line: line.split("\t"))
print(triplet_rdd_reduced.take(5))

#taking all the songs from the data
unique_interests = triplet_rdd_reduced.map(lambda x : x[1]).collect()

#print(unique_interests)
## Taking all the unique songs from the data
unique_interests_arr = np.array(unique_interests)
unique_interests_df_distict = np.unique(unique_interests_arr)

unique_interests_df_distict
print(unique_interests_df_distict)
print(unique_interests_df_distict.shape)

#unique_interests_df_distict_list = unique_interests_df_distict.rdd.map(lambda x:x[1]).collect()
#print(type(unique_interests_df_distict_list))'''
#top_unique_interests_df_distict =  unique_interests_df_distict.head(500)
#print(top_unique_interests_df_distict)
'''unique_interests = sorted(list({ interest
                                 for user_interests in users_interests
                                 for interest in user_interests }))
unique_interests'''




[['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOBONKR12A58A7A7E0', '1'], ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOEGIYH12A6D4FC0E3', '1'], ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOFLJQZ12A6D4FADA6', '1'], ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOHTKMO12AB01843B0', '1'], ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SODQZCY12A6D4F9D11', '1']]
['SOAAGFH12A8C13D072' 'SOAAVUV12AB0186646' 'SOACRJG12A8C137A8D'
 'SOACXGP12A8C1323AF' 'SOADVUP12AB0185246' 'SOAFQQP12A58A7B709'
 'SOAIBYI12AB0185C5B' 'SOAIIKF12A58A7DD58' 'SOALUPO12A8C131951'
 'SOARIXF12A8AE487CA' 'SOATBYQ12AB0188962' 'SOATCSU12A8C13393A'
 'SOAUWYT12A81C206F1' 'SOAXGDH12A8C13F8A1' 'SOAYETG12A67ADA751'
 'SOAYFQF12AF729E879' 'SOAYGQW12AB017D6FA' 'SOAZRJG12A6D4F8A30'
 'SOBADEB12AB018275F' 'SOBADWO12A8C13D83A' 'SOBBABK12A8C13FFBC'
 'SOBCDFC12A58A7B4B3' 'SOBEHXG12A8C138D22' 'SOBENFI12AB018B09A'
 'SOBEVGM12A67ADBCA7' 'SOBKRVG12A8C133269' 'SOBKZLH12A8C13B7D3'
 'SOBOAFP12A8C131F36' 'SOBONKR12A58A7A7E0' 'SOBPFUQ12A6D4F

'unique_interests = sorted(list({ interest\n                                 for user_interests in users_interests\n                                 for interest in user_interests }))\nunique_interests'

In [None]:

def make_user_interest_vector(user_interests):
    """given a list of interests, produce a vector whose i-th element is 1
    if unique_interests[i] is in the list, 0 otherwise"""
    
    return [1 if interest in user_interests else 0
            for interest in unique_interests_df_distict]

## Droping the song count information form the reduced data

triplet_rdd_song_user=triplet_rdd_reduced.map(lambda x: (x[0],x[1]))
# Grouping this data on the user as key

triplet_rdd_song_user=triplet_rdd_song_user.groupByKey().map(lambda x: (x[0],list(x[1])))
## Taking only top 400 of this data because of the memory restrictions

# taking the songs data in one list
triplet_rdd_song = triplet_rdd_song_user.map(lambda x: x[1]).take(400)

# Taking the users data in another list
triplet_rdd_user = triplet_rdd_song_user.map(lambda x : x[0]).take(400)
#print(triplet_rdd_song.take(10))
#print(triplet_rdd_key_as_user_grouped.collect())
#users_interests = triplet_rdd_key_as_user_grouped.take(400)

## Computing the user interests matrix
user_interest_matrix = list(map(make_user_interest_vector, triplet_rdd_song))
print(user_interest_matrix)



In [None]:
#pip install numpy

In [None]:
import math

user_similarities = [[cosine_similarity(interest_vector_i, interest_vector_j)
                      for interest_vector_j in user_interest_matrix]
                     for interest_vector_i in user_interest_matrix]

import numpy as np
user_similarities_arr = np.array(user_similarities)
print(user_similarities_arr.shape)

## Make the diagnol points zero as every user has highest similarity to itself
for i in range(0,user_similarities_arr.shape[0]):
    for j in range(0, user_similarities_arr.shape[1]):
        if i ==j:
            user_similarities_arr[i][j] = 0.0
#user_similarities_arr_reset = np.where(user_similarities_arr == 1.0)


print(np.amax(user_similarities_arr))

result = np.where(user_similarities_arr == np.amax(user_similarities_arr))
print(result)
## cosine matrix
#print(user_similarities)

arr_to_initialise = []
## creating the user pair with similarity score

for i in range(0,user_similarities_arr.shape[0]):
    for j in range(0, user_similarities_arr.shape[1]):
        if i < j:
            arr_to_initialise.append((i, j, user_similarities_arr[i][j]))

            #arr_to_initialise.append((triplet_rdd_user[i], triplet_rdd_user[j], user_similarities_arr[i][j]))
#print(arr_to_initialise)



In [None]:


##  Creating an RDD with this data
arr_to_pass_tosc = np.array(arr_to_initialise)
print(type(arr_to_pass_tosc))
newRDD = sc.parallelize(arr_to_pass_tosc)

# sorting this rdd on the similarity score value

users_sorted_with_similarity = newRDD.sortBy(lambda x : -x[2])
# Take top 5
top_users = users_sorted_with_similarity.take(5)
top_users
####  Top users


In [None]:
###3. If the top-5 user set has an user appearing more than once, 
#ignore that pair and take the next best pair from the sorted list.

## I have already dropped the pair which were repeating now 
#I will drop any pair for which even one user already has a pair.
## 

similarity_list = users_sorted_with_similarity.collect()
similarity_list
similarity_list_arr = np.array(similarity_list)
similarity_list_arr
already_taken = []
print(similarity_list_arr.shape)



unique_top_similar = []


for i in range(0, similarity_list_arr.shape[0]):
    
    ## Dropping the user pair if user already been considered 
    if similarity_list_arr[i][0] in already_taken:
        continue
    if similarity_list_arr[i][1] in already_taken:
        continue
   
    unique_top_similar.append(similarity_list_arr[i])
    already_taken.append(similarity_list_arr[i][0])
    already_taken.append(similarity_list_arr[i][1])
    
    
## Get only 5 top distinct wiuthout any user repetition

unique_top_similar_top_five = unique_top_similar[:5]
print(unique_top_similar_top_five)
    
        

In [None]:
##4. For a given user_id, identify the top-5 similar users and hence song recommendations from other user's list. 



given_user_sid = 1
## Part 4 
user_similarities = [[cosine_similarity(interest_vector_i, interest_vector_j)
                      for interest_vector_j in user_interest_matrix]
                     for interest_vector_i in user_interest_matrix]

def most_similar_users_to(user_id):
    pairs = [(other_user_id, similarity)              # find other
             for other_user_id, similarity in         # users with
                enumerate(user_similarities[user_id]) # nonzero
             if user_id != other_user_id and similarity > 0]  # similarity

    return sorted(pairs,                              # sort them
                  key=lambda pair: pair[1],           # most similar
                  reverse=True) 

#print(user_similarities)
most_similar_users_to_id = most_similar_users_to(given_user_sid)
most_similar_users_to_id

In [None]:
given_user_sid = triplet_rdd_user[int(given_user_sid)]
given_user_sid

In [None]:


a = users_sorted_with_similarity.filter(lambda x : x[0]== giveuser_id).map(lambda x:x[1])
top_5_similar = a.take(5)
print(top_5_similar)
user_similar_id_str = []

## Top 5 similar users :
for i in range(0, len(most_similar_users_to_id)):
    print(most_similar_users_to_id[i][0])
    print(triplet_rdd_user[int(most_similar_users_to_id[i][0])])
    
    ## Getting the str version of the similar  user ids
    user_similar_id_str.append(triplet_rdd_user[int(most_similar_users_to_id[i][0])])
 



In [None]:
print(user_similar_id_str)
#print(triplet_rdd_song_user.collect())

similar_user_songs = triplet_rdd_song_user.filter(lambda x : x[0] in user_similar_id_str).flatMap(lambda x : x[1]).collect()

## users alreday liked songs
user_already_liked_songs = triplet_rdd_song_user.filter(lambda x : x[0] == given_user_sid).flatMap(lambda x : x[1]).collect()

similar_user_songs
new_song_suggestions = []
for i in range(0, len(similar_user_songs)):
    
    # Adding the new recommendations
    if similar_user_songs[i] not in user_already_liked_songs:
        
        print(similar_user_songs[i])
        new_song_suggestions.append(similar_user_songs[i])
new_song_suggestions

