# Rutwick Bhawsar | Shivang Arya
## Project - 1 : Apache Spark 

Instructor: Ramesh Yerraballi
TA: Madhumitha Sakthi
Semester: Fall 2019
Due Date: 11:59pm, Monday 9/16

This project is based on Map-Reduce Framework. In these you will get to work with Spark and will get to know how 
does spark work, what functionalities does spark provide, what does map-reduce framework do and why is it useful. 

In this project you will be implementing a basic song recommender system. You will be given a dataset where there are multiple csv files. These csv files have data corresponding to song play count and song information.

The data you would be using will be provided in a zip file along with this notebook. The __msd.zip__ archive contains '_kaggle_visible_evaluation_triplets.txt_'. We will be using the visible part of the testing data to understand the working on Apache Spark.  The user's listening history is provided as: (user, song, play count). In another file, '_kaggle_songs.txt_', each song is marked using an index for easier representation of songs. 



What to turn in:
A zip folder which will have:
- Jupyter Notebook
- A brief report on what features you used for recommendation. And a brief explanation of flow of your code. For example,  what RDD does what or, why it was created.
- datasets folder with the csv files you are using in your notebook.
- Notebook should use relative path to the csv files in datasets folder.
- Name of the zip folder - <your\_name>\_<your\_partner_name>.zip


This project consists of 4 questions. 
1. Create an RDD with _msd_evalutation_triplets.txt_ and replace the song name with the song index from _msd_songs.txt_. Identify the number of songs that do not have any rating. 
2. Generate song ratings based on the song play count as a normalized score between 0 and 1. 
3. Identify the popular song based on this rating and recommend songs to user, given user id based on the algorithm used in Movie recommender system from class. 
4. Using Cosine similarity function, identify pair-wise similarity between each pair of users and generate the top 5 most similar users without an overlap in users. 

The above list is the higer level idea about the questions. 

In [1]:
##### These lines are to tell jupyter where to find Apache Spark ####
import findspark
#findspark.init('C:\\apachespark')
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("Songs")
# There are two configurable parameters
# 1. A cluster URL, namely  local  in this example, which tells Spark how to connect
# to a cluster.  local  is a special value that runs Spark on one thread on the local
# machine, without connecting to a cluster.
# 2. An application name, namely  Movies  in this example. This will identify your
# application on the cluster managerâ€™s UI if you connect to a cluster.
sc = SparkContext(conf = conf)
##### These lines are to tell jupyter where to find Apache Spark ####

In [2]:
## Read triplet file into RDD
triplet_rdd = sc.textFile("kaggle_visible_evaluation_triplets.txt").map(lambda line: line.split("\t")) 

In [3]:
songs_rdd=sc.textFile("kaggle_songs.txt").map(lambda line: line.split(" "))

# Step 1: 
Replace song name with song index and identify the number of songs without user history

## Step 1. Part 1

In [4]:
### Step 1 Part 1: Replacing the song name with song index

### Mapping the columns of the kaggle_songs dataset to a key-value pair RDD. 
###'songswith' is the new RDD. Song Name is the key and Song Index is the value.

songswith = songs_rdd.map(lambda x: ((x[0]),(x[1])))
songswith.collect()

[('SOAAADD12AB018A9DD', '1'),
 ('SOAAADE12A6D4F80CC', '2'),
 ('SOAAADF12A8C13DF62', '3'),
 ('SOAAADZ12A8C1334FB', '4'),
 ('SOAAAFI12A6D4F9C66', '5'),
 ('SOAAAGK12AB0189572', '6'),
 ('SOAAAGN12AB017D672', '7'),
 ('SOAAAGO12A67AE0A0E', '8'),
 ('SOAAAGP12A6D4F7D1C', '9'),
 ('SOAAAGQ12A8C1420C8', '10'),
 ('SOAAAKE12A8C1397E9', '11'),
 ('SOAAALJ12AB01828B4', '12'),
 ('SOAAAMT12AB018C9C4', '13'),
 ('SOAAANN12A8C14425E', '14'),
 ('SOAAANV12A6D4FB6C4', '15'),
 ('SOAAAQN12AB01856D3', '16'),
 ('SOAAAQQ12A8C13785B', '17'),
 ('SOAAAUY12A8C133A1B', '18'),
 ('SOAAAWK12A8C134738', '19'),
 ('SOAAAXN12A8C13A70C', '20'),
 ('SOAAAZL12A8C135238', '21'),
 ('SOAABCT12AB0185A57', '22'),
 ('SOAABDH12A6D4F7658', '23'),
 ('SOAABDR12AF72A056F', '24'),
 ('SOAABDX12A58A7E508', '25'),
 ('SOAABEX12A8AE467CB', '26'),
 ('SOAABII12AB0189914', '27'),
 ('SOAABLG12A6D4F73D2', '28'),
 ('SOAABLP12A6D4F8861', '29'),
 ('SOAABMP12A6D4F7633', '30'),
 ('SOAABNE12A8C141154', '31'),
 ('SOAABQL12A67020E76', '32'),
 ('SOAABQR12AF72A

In [5]:
### We are now taking the kaggle_visible_evaluation_triplets.txt dataset and mapping it to a key-value RDD. 
### The key is the Song Name and the User Name and Number of Times played is the Value. 
### We are storing it in the temptriplet RDD. 

temptriplet = triplet_rdd.map(lambda x: ((x[1]),(x[0],x[2])))
temptriplet.collect()

[('SOBONKR12A58A7A7E0', ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1')),
 ('SOEGIYH12A6D4FC0E3', ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1')),
 ('SOFLJQZ12A6D4FADA6', ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1')),
 ('SOHTKMO12AB01843B0', ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1')),
 ('SODQZCY12A6D4F9D11', ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1')),
 ('SOXLOQG12AF72A2D55', ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1')),
 ('SOUVUHC12A67020E3B', ('d7083f5e1d50c264277d624340edaaf3dc16095b', '1')),
 ('SOUQERE12A58A75633', ('d7083f5e1d50c264277d624340edaaf3dc16095b', '1')),
 ('SOIPJAX12A8C141A2D', ('d7083f5e1d50c264277d624340edaaf3dc16095b', '1')),
 ('SOEFCDJ12AB0185FA0', ('d7083f5e1d50c264277d624340edaaf3dc16095b', '2')),
 ('SOATCSU12A8C13393A', ('d7083f5e1d50c264277d624340edaaf3dc16095b', '1')),
 ('SOZPZGN12A8C135B45', ('d7083f5e1d50c264277d624340edaaf3dc16095b', '1')),
 ('SOPFVWP12A6D4FC636', ('d7083f5e1d50c264277d624340edaaf3dc16095b', '1')),
 ('SOHEKND12

In [6]:
### We have used a join to combine 'songswith' with the 'temptriplet' RDD. 
### The Song Name which is the key in both RDD is used in to join the RDDs.
### We have stored the output of the join in final1 RDD. 

final1=temptriplet.join(songswith)
final1.collect()

[('SOBONKR12A58A7A7E0',
  (('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1'), '25150')),
 ('SOBONKR12A58A7A7E0',
  (('c34670d9c1718361feb93068a853cead3c95b76a', '1'), '25150')),
 ('SOBONKR12A58A7A7E0',
  (('c5006d9f41f68ccccbf5ee29212b6af494110c5e', '1'), '25150')),
 ('SOBONKR12A58A7A7E0',
  (('e4332e11f4df6dd26673bb6b085e9a2bbdc9b8a5', '2'), '25150')),
 ('SOBONKR12A58A7A7E0',
  (('baf2fe5885ab93fbbdb7fecc6691788e70afb6c8', '4'), '25150')),
 ('SOBONKR12A58A7A7E0',
  (('f6e34f0a68d5ea1344511e33486f956de361db78', '1'), '25150')),
 ('SOBONKR12A58A7A7E0',
  (('e326c4b9fe3659ec1dc3af53fd7e0893809dafbc', '25'), '25150')),
 ('SOBONKR12A58A7A7E0',
  (('00f7c493ee64884998ea98d9f5bed87bc4a0afcf', '5'), '25150')),
 ('SOBONKR12A58A7A7E0',
  (('daa9e7e53ae787ab4f1b5518b695198947d821a2', '1'), '25150')),
 ('SOBONKR12A58A7A7E0',
  (('cd4321d8fd42ba44996e7f34c2f6404cf5884696', '1'), '25150')),
 ('SOBONKR12A58A7A7E0',
  (('bcb1e6d620cf522390d5c92bae26936928e0b588', '26'), '25150')),
 ('SOBONKR12A58A7A7

In [7]:
### Since we now have an unnecessary column of the Song Name, we can eliminate it using the map function. 
### So the final answer for the Step 1 part 1 is saved in finalans RDD. 

finalans=final1.map(lambda x: (x[1][0][0],x[1][1],x[1][0][1]))
finalans.top(10)

[('ffff07d7d9bb187aa58c7b81b3d3f35e7cf7c0ee', '99278', '1'),
 ('ffff07d7d9bb187aa58c7b81b3d3f35e7cf7c0ee', '55040', '2'),
 ('ffff07d7d9bb187aa58c7b81b3d3f35e7cf7c0ee', '54501', '1'),
 ('ffff07d7d9bb187aa58c7b81b3d3f35e7cf7c0ee', '47950', '1'),
 ('ffff07d7d9bb187aa58c7b81b3d3f35e7cf7c0ee', '3881', '1'),
 ('ffff07d7d9bb187aa58c7b81b3d3f35e7cf7c0ee', '38755', '1'),
 ('ffff07d7d9bb187aa58c7b81b3d3f35e7cf7c0ee', '314143', '1'),
 ('ffff07d7d9bb187aa58c7b81b3d3f35e7cf7c0ee', '243454', '25'),
 ('ffff07d7d9bb187aa58c7b81b3d3f35e7cf7c0ee', '226936', '5'),
 ('ffff07d7d9bb187aa58c7b81b3d3f35e7cf7c0ee', '170115', '1')]

## Step 1. Part 2

In [8]:
### Step 1 Part 2: Identifying the number of songs without user history. 

### Essentially finding all songs not present in the 'kaggle_visible_evaluation_triplets.txt'
### and present in the 'kaggle_songs.txt'

### First we extract the Song Name column from the 'triplet_rdd' RDD and save it as 'usersongs'. 
### We extract the Song Name from the 'songs_rdd' RDD and save it as 'totalsongs'. 

usersongs=triplet_rdd.map(lambda x: x[1])
totalsongs=songs_rdd.map(lambda x: x[0])

In [9]:
### We find out all unique songs from the 'usersongs' since multiple users could have listened to the same song.

usersongs=usersongs.distinct()

### We then subtract distinct 'usersongs' form the 'totalsongs' RDD to get the total songs without user history.

songswithoutuserhistory=totalsongs.subtract(usersongs)

### To get the total number of songs without user history we use the count function. 
### The answer to Step 1 Part 2 is saved in number.

number = songswithoutuserhistory.count()
print("\nNumber of songs without User History: ",number)


Number of songs without User History:  223007


## Step 2:
Generate song ratings based on the play_count. For example, if (song_1, 5; song_2, 10; song_3, 5) i.e., song_1 is played 5 times, song_2 is played 10 times and song_3 is played 5 times, the normalized rating score should be 0.25, 0.5 and 0.25 respectively. 
Similarly, generate the rating for all the songs. You may notice that based on all songs, the rating is almost always very low. So, think of the best way to convert song count to ratings. (Hint: Try generating ratings based on each user's song play history)

In [10]:
### Step 2 

### First we get the rating based on all songs. 
### We store the Song Name and Total Number of Time Played columns from the original RDD.
### We then reduce it by the key: Song Name and added the number of times each song is played by all the users. 
### We store this RDD in the 'temp' variable.

songsplayedtotal = triplet_rdd.map(lambda x: (x[1],int(x[2])))
temp = songsplayedtotal.reduceByKey(lambda x, y: x + y)
temp.collect()

[('SOBONKR12A58A7A7E0', 35432),
 ('SOFLJQZ12A6D4FADA6', 7895),
 ('SOHTKMO12AB01843B0', 10515),
 ('SOXLOQG12AF72A2D55', 4671),
 ('SOZPZGN12A8C135B45', 39),
 ('SOPFVWP12A6D4FC636', 149),
 ('SODSKZZ12AB0188524', 27),
 ('SOACRJG12A8C137A8D', 12),
 ('SOSOUKN12A8C13AB79', 134),
 ('SOFRQTD12A81C233C0', 19454),
 ('SOZQIUZ12A8C13CFBE', 11),
 ('SOAAGFH12A8C13D072', 29),
 ('SOFIPHI12AAF3B3DB2', 434),
 ('SOORFBA12A8C13E49E', 86),
 ('SOXPJVO12A6D4FCC69', 229),
 ('SOQMFWG12AB0186AD8', 184),
 ('SOUIROO12A8C139C18', 71),
 ('SOSHJHA12AB0181410', 822),
 ('SOSERCK12AB0186462', 43),
 ('SOUAZDS12A8C13BA91', 220),
 ('SOQCTEC12AB0186D7E', 55),
 ('SOBXMJC12AB018293E', 25),
 ('SODBXDO12A6D4FCD4F', 128),
 ('SOADVUP12AB0185246', 175),
 ('SOIWEDH12AF72AB132', 13),
 ('SOVOGGO12A8C1433B2', 51),
 ('SOUGACV12A6D4F84E0', 19),
 ('SOVDSJC12A58A7A271', 11610),
 ('SOBPFUQ12A6D4FCDD2', 22),
 ('SOHOTAA12A8AE45F43', 460),
 ('SOEXWRK12AAF3B26AE', 29),
 ('SOWLMMH12A81C22F5E', 29),
 ('SOONUTJ12A6701D7B4', 264),
 ('SONXYJW12AB01

In [11]:
### Now we have taken the sum total of all songs ever played and stored it as 'ssum'

ssum = temp.map(lambda x: int(x[1])).reduce(lambda x,y:int(x)+int(y))

### We then divide the number of times a song is played by the total number of times all songs have been played.
### This way we get the Rating based on the Songs. 
### We have multiplied this rating by 100000 to scale the rating. 
### We have saved this in the 'songrating' RDD.
### As you see the highest rated song in our dataset has a rating 766 out of 100000. 

songrating = temp.map(lambda x: (x[0],(int(x[1])/ssum)*100000)).sortBy(lambda x: -x[1])
songrating.collect()

[('SOBONKR12A58A7A7E0', 766.2066370552338),
 ('SOAUWYT12A81C206F1', 717.4861709995373),
 ('SOSXLTC12AF72A7F54', 526.7562506217104),
 ('SOFRQTD12A81C233C0', 420.68706020751074),
 ('SOEGIYH12A6D4FC0E3', 370.1068693045927),
 ('SOAXGDH12A8C13F8A1', 308.7791987613368),
 ('SONYKOW12AB01849C9', 267.97337566009423),
 ('SOVDSJC12A58A7A271', 251.06285437489456),
 ('SOUFTBI12AB0183F65', 233.4170930338167),
 ('SOHTKMO12AB01843B0', 227.38379963410995),
 ('SOPUCYA12A8C13A694', 214.5387233637665),
 ('SOOFYTN12A6D4F9B35', 209.43529238767047),
 ('SOBOUPA12A6D4F81F1', 194.01687592175315),
 ('SODJWHY12A8C142CCE', 188.52420021019216),
 ('SOLFXKT12AB017E3E0', 175.00875800654796),
 ('SOTCMDJ12A6D4F8528', 172.32729427334496),
 ('SOFLJQZ12A6D4FADA6', 170.72706591643347),
 ('SOTWNDJ12A8C143984', 158.53073087186496),
 ('SOUVTSM12AC468F6A7', 155.91414126123942),
 ('SOUNZHU12A8AE47481', 152.49743747215817),
 ('SOUSMXX12AB0185C24', 146.5722676100806),
 ('SOUDLVN12AAFF43658', 143.82592975430006),
 ('SOWEHOM12A6BD4E

In [12]:
### Continuing the same Step 2.
### We are generating ratings based on each user's song play history

### We want the total number of times a user has heard all his songs.
### We first take the 'User Name' and 'Number of Times played' columns from the original RDD and store it in 'songsplayeduser'
### We then reduce it by the 'User Name' and add the 'Number of Times' column. Store it in 'totaldenom'.
### We now know the total number of times a user has heard all his songs. 

### As we can see the topmost User has heard his songs 1305 times. 

songsplayeduser = triplet_rdd.map(lambda x: (x[0],int(x[2])))
totaldenom = songsplayeduser.reduceByKey(lambda x, y: int(x)+int(y)).sortBy(lambda x:-x[1])
totaldenom.collect()

[('090b841eaf56d343a26625c2c6d08b823927bc4f', 1305),
 ('938c2632d43eeadb8a83a7cc254d014f9cea6afe', 1267),
 ('1c5aa998482a40abfd020759e7d757eb6c510e72', 1200),
 ('c6150292374fb1dad89982367b3245dd5004c718', 1192),
 ('5a9375e46a7e9b869058c7bc0e820e00d77f3e0b', 1184),
 ('d1d845a92cd34456423e781512bdb502ca385b51', 1180),
 ('957440a77858369fb7a6bcc6fa408fc187d5bd7b', 1150),
 ('315103a41c2ced1143de0c2ba20de224800e6d89', 1148),
 ('22bb29714137fa47083963c30e1a26f1bf517e7d', 1141),
 ('bda891a59a96252cc0f5b1f63f2630692b490e37', 1140),
 ('b51eeda3c09e2426f2e52dca8444a8c0cab6265a', 1119),
 ('b371df81acfde601d157a99c50fa06f7b3f76a84', 1038),
 ('9d12c30de5d9975f621f95f512699989ac544410', 1001),
 ('7bb7949be3d91d27f1c5613bea01644fcad77f43', 996),
 ('a1cc4d088600df2db42e6a305974675ffe8c905d', 951),
 ('77867fcbd93f3bf47f478a797f3bfabb0f561ea8', 926),
 ('a86e68128c6a48d3392db22ce1151a3bdccf5fcf', 912),
 ('08879a07270b36aaa09923890b221fdef1e9849e', 884),
 ('9948794b0687c0b3cd2094845cb7915d55793492', 884),

In [13]:
### We, yet again, take the original RDD and save it as a key value pair in 'temptriplet'
### The key is the User Name and the Songs Name and Number of Times played is the Value. 

temptriplet = triplet_rdd.map(lambda x: (x[0],(x[1],x[2])))

### We then join it to our 'totaldenom' RDD. This way we get 4 column. User Name, Song Name, Times PLayed, UsersTotal
temp = temptriplet.join(totaldenom)
temp.collect()

[('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SOUVUHC12A67020E3B', '1'), 17)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SOUQERE12A58A75633', '1'), 17)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SOIPJAX12A8C141A2D', '1'), 17)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SOEFCDJ12AB0185FA0', '2'), 17)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SOATCSU12A8C13393A', '1'), 17)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SOZPZGN12A8C135B45', '1'), 17)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SOPFVWP12A6D4FC636', '1'), 17)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SOHEKND12A8AE481D0', '1'), 17)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SOPSVVG12A8C13B444', '1'), 17)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SODSKZZ12AB0188524', '1'), 17)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SONZTNP12A8C1321DF', '1'), 17)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SOVVLKF12A8C1424F0', '1')

In [14]:
### After the join we want to get the ratings per song per user. 
### So we divide the number of times each song is played by the total number of times a user has heard his songs. 
### We are multiplying the rating by 100 to scale it and make it easier to read. 
### Therefore all ratings are out of 100.
### Step 2 final answer is saved in 'finalans2' RDD.

finalans2 = temp.map(lambda x: (x[0],x[1][0][0],(int(x[1][0][1])/x[1][1])*100))
finalans2.collect()

[('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOUVUHC12A67020E3B',
  5.88235294117647),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOUQERE12A58A75633',
  5.88235294117647),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOIPJAX12A8C141A2D',
  5.88235294117647),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOEFCDJ12AB0185FA0',
  11.76470588235294),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOATCSU12A8C13393A',
  5.88235294117647),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOZPZGN12A8C135B45',
  5.88235294117647),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOPFVWP12A6D4FC636',
  5.88235294117647),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOHEKND12A8AE481D0',
  5.88235294117647),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOPSVVG12A8C13B444',
  5.88235294117647),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SODSKZZ12AB0188524',
  5.88235294117647),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SONZTNP12A8C1321DF',
  5.88235294117647)

## Step 3: 
For a given user_id, rating, recommend 5 other songs from the list. One way to do this is based on another user who liked the same song liked by this user with rating more than the given rating and recommend the 5 songs based on the matched user's rating. 

In [15]:
### Step 3: Recommendations. 

### We first initialized our user_id and song_id. 

user_id="f6e34f0a68d5ea1344511e33486f956de361db78"
song_id="SOMDBJK12A6D4F873A"

### Similar to step 2, we want to get a global rating.
### We do this in order to recommend higher rated songs by other users that like the same song.
### This is the exact same steps we have followed as Step 2 to calculate the total number of songs played by all users
### Value stored in 'ssum'

songsplayedtotal = triplet_rdd.map(lambda x: (x[1],int(x[2])))
temp = songsplayedtotal.reduceByKey(lambda x, y: x + y)
ssum = temp.map(lambda x: int(x[1])).reduce(lambda x,y:int(x)+int(y))
print("\nTotal number of songs played by all Users: ",ssum,"\n")


### Unlike Step 2 we calculate the global rating given by a particular user to a particular song. 
### Store it in 'final3' RDD. 
### It has 3 column: User Name, Song Name, Rating
final3 = triplet_rdd.map(lambda x: (x[0],x[1],(int(x[2])/ssum)*100000))
final3.collect()


Total number of songs played by all Users:  4624340 



[('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d',
  'SOBONKR12A58A7A7E0',
  0.021624707525830716),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d',
  'SOEGIYH12A6D4FC0E3',
  0.021624707525830716),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d',
  'SOFLJQZ12A6D4FADA6',
  0.021624707525830716),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d',
  'SOHTKMO12AB01843B0',
  0.021624707525830716),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d',
  'SODQZCY12A6D4F9D11',
  0.021624707525830716),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d',
  'SOXLOQG12AF72A2D55',
  0.021624707525830716),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOUVUHC12A67020E3B',
  0.021624707525830716),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOUQERE12A58A75633',
  0.021624707525830716),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOIPJAX12A8C141A2D',
  0.021624707525830716),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOEFCDJ12AB0185FA0',
  0.04324941505166143),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SO

In [16]:

### We want to get the song rating for the given User Name and Song Name.
### We filter the 'final3' to get the row where Song Name = given Song Name and User Name = given user_id
### We then extract the rating column from the row
### Save this in 'T' RDD.
### Explicitly convert the only value of 'T' RDD into a float number and save it as 'n'.

T=final3.filter(lambda x: (x[0]==user_id and x[1]==song_id)).map(lambda x: x[2])
n=float(T.collect()[0])
print("\nThe song rating for the given User Name and Song Name is: ",n)


The song rating for the given User Name and Song Name is:  0.10812353762915357


In [17]:

### From our 'final3' RDD we filter the rows where:
### User Name != given user_id AND
### Song Name == given song name AND
### Rating of Songs > Rating given by User.
### This is to ensure that the set of rows we extract contain Users that have rated the same song higher than our given User.

T=final3.filter(lambda x: (x[0]!=user_id and x[1]==song_id and float(x[2])>n))
T.top(3)

### We mapped it to 'TKV' RDD so that we can see the other Users and the rating they gave to our Song Name.

TKV=T.map(lambda x: (x[0],(x[2])))
TKV.collect()

[('f26dd0f6f3831009dc603c0f8a78be5cfdda08d4', 0.1297482451549843),
 ('ac161501199a1b984f8deff58ef592b9ebf67483', 0.8433635935073978),
 ('c87a0f11929453f3fb3c75ee26d24183901602b1', 0.151372952680815),
 ('38184d13862f7466d8318557b921e91720fdef40', 0.1297482451549843),
 ('7c78aa227e6cf1db5dd944225079cb4ed9ce5b44', 0.151372952680815),
 ('1fb9f1ecd99c6954f5fe1661555cae25eb620cbb', 0.151372952680815),
 ('79467772a02de3abd8c2f40c88b6b482830b9887', 0.3243706128874607),
 ('18ebbc135a61c1bde96622444092507b17a157a5', 0.6271165182490906),
 ('a9afaf3bf52288cd56cc6e32384a2fa8b4fa38d1', 0.21624707525830714),
 ('0073edc5ed61275bb29413486df554a99063ae55', 0.17299766020664573)]

In [18]:

### We want to get all other songs heard by these Users.

### For that first we take our 'final3' RDD and map it to a key-value RDD.
### Key is the User Name
### Value is the Song Name and Rating.
### This key-value RDD is stored in 'final3KV'

final3KV=final3.map(lambda x: (x[0],(x[1],x[2])))
final3KV.collect()

[('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d',
  ('SOBONKR12A58A7A7E0', 0.021624707525830716)),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d',
  ('SOEGIYH12A6D4FC0E3', 0.021624707525830716)),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d',
  ('SOFLJQZ12A6D4FADA6', 0.021624707525830716)),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d',
  ('SOHTKMO12AB01843B0', 0.021624707525830716)),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d',
  ('SODQZCY12A6D4F9D11', 0.021624707525830716)),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d',
  ('SOXLOQG12AF72A2D55', 0.021624707525830716)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  ('SOUVUHC12A67020E3B', 0.021624707525830716)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  ('SOUQERE12A58A75633', 0.021624707525830716)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  ('SOIPJAX12A8C141A2D', 0.021624707525830716)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  ('SOEFCDJ12AB0185FA0', 0.04324941505166143)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  ('S

In [19]:

### We join 'final3KV' RDD to our 'TKV' RDD, key being User Name.
### We now have all songs heard by the selected Users that like our song. 

TT=final3KV.join(TKV)
TT.collect()

[('38184d13862f7466d8318557b921e91720fdef40',
  (('SOTIYYT12AC9072CA7', 0.08649883010332286), 0.1297482451549843)),
 ('38184d13862f7466d8318557b921e91720fdef40',
  (('SORAOUS12A8C13950A', 0.06487412257749214), 0.1297482451549843)),
 ('38184d13862f7466d8318557b921e91720fdef40',
  (('SORHKKL12AC9072CB6', 0.08649883010332286), 0.1297482451549843)),
 ('38184d13862f7466d8318557b921e91720fdef40',
  (('SOMDBJK12A6D4F873A', 0.1297482451549843), 0.1297482451549843)),
 ('38184d13862f7466d8318557b921e91720fdef40',
  (('SOTIGPL12AC9072CBB', 0.08649883010332286), 0.1297482451549843)),
 ('38184d13862f7466d8318557b921e91720fdef40',
  (('SOHUPEL12AC9072CAC', 0.06487412257749214), 0.1297482451549843)),
 ('79467772a02de3abd8c2f40c88b6b482830b9887',
  (('SOSQIPK12A6D4F966A', 0.21624707525830714), 0.3243706128874607)),
 ('79467772a02de3abd8c2f40c88b6b482830b9887',
  (('SOLMKTS12A8AE46C27', 0.2811211978357993), 0.3243706128874607)),
 ('79467772a02de3abd8c2f40c88b6b482830b9887',
  (('SOMDBJK12A6D4F873A', 0.

In [20]:

### Lastly we want the Songs heard by these Users which they rated higher than our Song.
### All songs with a higher rating than our song will be filtered.
### These songs are then mapped to extract only the recommended Song Name and its rating.
### This RDD is then sorted n Descending order by rating in order to get the the top recommendation for our User.


FINAL3=TT.filter(lambda x: (x[1][0][1]>x[1][1])).map(lambda x: (x[1][0][0],x[1][0][1])).sortBy(lambda x: -x[1])

### Our final answer for Step 3 is stored in 'FINAL3' RDD. 
### We display the top 5 song recommendations and their rating by other users. 

FINAL3.take(5)


[('SOHTKMO12AB01843B0', 1.059610668765705),
 ('SOSUIAK12AB01850CB', 0.6487412257749214),
 ('SOPLDTD12AB0184B2F', 0.4108694429907836),
 ('SOLGPOU12A58A7EA20', 0.36762002793912213),
 ('SOWFGOB12A58A7A7FD', 0.34599532041329145)]

# Step 4: 
1. Compute cosine similarity between all pairs of users. 
2. Sort the similarity score and print the top-5 similar users. 
3. If the top-5 user set has an user appearing more than once, ignore that pair and take the next best pair from the sorted list. 
4. For a given user_id, identify the top-5 similar users and hence song recommendations from other user's list. 

In [21]:
### From the pyspark.ml library we imported HashingTF and IDF. 
### These are used in the calculation of the Cosine Similarities. 
### We also downloaded the Normalizer for the calculation of L2 norms. 
### We imported psf from the pyspark.sql library.
### We imported desc in order to sort our results by Descending order.
### We imported DoubleType since we were getting the results of Cosine Similarities as a Double Value.

import pyspark.sql.functions as psf
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.feature import Normalizer
from pyspark.sql.functions import desc
from pyspark.sql.types import DoubleType
from pyspark.sql import SparkSession

spark = SparkSession(sc)

### We first input our txt file into an RDD splitting it on \t. 

triplet_rdd = sc.textFile("kaggle_visible_evaluation_triplets.txt").map(lambda line: line.split("\t")) 

### We extracted the first 5000 rows. 
### We perfomed our Question 4 on the first 5000 rows in the file.

temp=sc.parallelize(triplet_rdd.take(5000))
temp.top(10)

[['ff8891b901eed1a3abc9bd4c53a17d2bcdc0ead7', 'SOYVWQO12A6D4FBC8A', '1'],
 ['ff8891b901eed1a3abc9bd4c53a17d2bcdc0ead7', 'SOWNVIV12AB0184846', '1'],
 ['ff8891b901eed1a3abc9bd4c53a17d2bcdc0ead7', 'SOSXLTC12AF72A7F54', '1'],
 ['ff8891b901eed1a3abc9bd4c53a17d2bcdc0ead7', 'SORVKAX12A8C1343AA', '1'],
 ['ff8891b901eed1a3abc9bd4c53a17d2bcdc0ead7', 'SOQLHST12AB0184A27', '1'],
 ['ff8891b901eed1a3abc9bd4c53a17d2bcdc0ead7', 'SOPRYBV12A6701E116', '7'],
 ['ff8891b901eed1a3abc9bd4c53a17d2bcdc0ead7', 'SONUSLR12A5891DCCB', '1'],
 ['ff8891b901eed1a3abc9bd4c53a17d2bcdc0ead7', 'SONQCXC12A6D4F6A37', '1'],
 ['ff8891b901eed1a3abc9bd4c53a17d2bcdc0ead7', 'SOIOFZH12A6D4FCE35', '5'],
 ['ff8891b901eed1a3abc9bd4c53a17d2bcdc0ead7', 'SOHCJYB12A58A7C8D6', '1']]

## Step 4. Part 1
<h4>We multiplied the second column with the third. This way we got the song-id, the exact amount of times the song was played.</h4>

<u>For example: The original RDD looks like this</u><br><br>
UserID: '91b8fac7dc5e03f6cfaf6e2aa7171f14a8354d62'<br>
SongID: 'SOYPQYA12A6D4FB8F4â€™<br>
Times played: â€˜3â€™
<br><br>
<u>Our new RDD (temp) will have:</u>

Column 1: '91b8fac7dc5e03f6cfaf6e2aa7171f14a8354d62'<br>
Column 2: ['SOYPQYA12A6D4FB8F4â€™, 'SOYPQYA12A6D4FB8F4â€™, 'SOYPQYA12A6D4FB8F4â€™]

We then reduced the RDD by the Key which is UserID. Therefore all the songs the user has ever listened to will now be in a single row in the temp3 RDD.


In [22]:
temp2 = temp.map(lambda x: (x[0],((x[1]+',')*int(x[2]))[:-1]))
temp3 = temp2.reduceByKey(lambda x, y: x + "," + y)
temp3.top(2)

[('ff8891b901eed1a3abc9bd4c53a17d2bcdc0ead7',
  'SOHCJYB12A58A7C8D6,SODJTHN12AF72A8FCD,SOWNVIV12AB0184846,SOEGIYH12A6D4FC0E3,SOSXLTC12AF72A7F54,SORVKAX12A8C1343AA,SONQCXC12A6D4F6A37,SOPRYBV12A6701E116,SOPRYBV12A6701E116,SOPRYBV12A6701E116,SOPRYBV12A6701E116,SOPRYBV12A6701E116,SOPRYBV12A6701E116,SOPRYBV12A6701E116,SONUSLR12A5891DCCB,SOIOFZH12A6D4FCE35,SOIOFZH12A6D4FCE35,SOIOFZH12A6D4FCE35,SOIOFZH12A6D4FCE35,SOIOFZH12A6D4FCE35,SOYVWQO12A6D4FBC8A,SOQLHST12AB0184A27'),
 ('fee1f31d583cb7e79b217661dc40f287bf65a13e',
  'SODYQBI12A8AE48DB4,SODYQBI12A8AE48DB4,SODYQBI12A8AE48DB4,SODYQBI12A8AE48DB4,SODYQBI12A8AE48DB4,SOSJOEN12A8C13B934,SOSJOEN12A8C13B934,SOSJOEN12A8C13B934,SOSJOEN12A8C13B934,SOSJOEN12A8C13B934,SOSJOEN12A8C13B934,SOSJOEN12A8C13B934,SOKHOJR12A8C14229D,SOKHOJR12A8C14229D,SOKHOJR12A8C14229D,SOKHOJR12A8C14229D,SOKHOJR12A8C14229D,SOQHQDL12A6D4F905D,SOMYETD12A6D4F8541,SOMYETD12A6D4F8541,SOMYETD12A6D4F8541,SOMYETD12A6D4F8541,SOMYETD12A6D4F8541,SOMYETD12A6D4F8541,SOMYETD12A6D4F8541,SOMYET

In [23]:
### We import the toDF attribute for the temp3 RDD so as to help convert it.
### We then converted the entire temp3 RDD to a Spark DataFrame to make it easier for computation. 
### This DataFrame has 2 columns: ID and Songs_Played. 

hasattr(temp3, "toDF")
df = temp3.toDF(["ID", "Songs_Played"]).withColumn("Songs_Played", psf.split(psf.regexp_replace("Songs_Played", " ", ""), ','))
df.head(2)

[Row(ID='6530c4fc41b9110de5d39fe0355fa103c66385f0', Songs_Played=['SOCHADN12A6310ED94', 'SOCHADN12A6310ED94', 'SOCHADN12A6310ED94', 'SOCHADN12A6310ED94', 'SOCHADN12A6310ED94', 'SOXVVSM12A8C142224', 'SOQLUTQ12A8AE48037', 'SOWEJXA12A6701C574', 'SOWEJXA12A6701C574', 'SOWEJXA12A6701C574', 'SOWEJXA12A6701C574', 'SOWEJXA12A6701C574', 'SOIDFJR12A6D4F7EB0', 'SOIDFJR12A6D4F7EB0', 'SOIDFJR12A6D4F7EB0', 'SOIDFJR12A6D4F7EB0', 'SOIDFJR12A6D4F7EB0', 'SOIDFJR12A6D4F7EB0', 'SOIDFJR12A6D4F7EB0', 'SOVQVNI12A58A778B6', 'SOSIZFO12A58A79934', 'SOSIZFO12A58A79934', 'SORYRWS12AB0186CD9', 'SORYRWS12AB0186CD9', 'SOTIJUH12A58A7B37C', 'SOERYLG12A6701F07F', 'SOERYLG12A6701F07F', 'SOERYLG12A6701F07F', 'SOERYLG12A6701F07F', 'SOERYLG12A6701F07F', 'SOERYLG12A6701F07F', 'SOERYLG12A6701F07F', 'SOERYLG12A6701F07F', 'SOKLRPJ12A8C13C3FE', 'SOKLRPJ12A8C13C3FE', 'SOZVCRW12A67ADA0B7', 'SOLEEUJ12A6701C787', 'SOPQLBY12A6310E992', 'SOPQLBY12A6310E992', 'SOXAQJS12AB018CBF7', 'SOXAQJS12AB018CBF7', 'SOPXKYD12A6D4FA876', 'SOPXKYD12

In [24]:
### Next we took the Hashing term frequency (TF) and stored it as â€˜tfâ€™ 
### We took the inverse document(IDF) frequency of the DataFrame and stored it as â€˜tfidfâ€™.

hashingTF = HashingTF(inputCol="Songs_Played", outputCol="tf")
tf = hashingTF.transform(df)
idf = IDF(inputCol="tf", outputCol="feature").fit(tf)
tfidf = idf.transform(tf)
tfidf.head(2)

[Row(ID='6530c4fc41b9110de5d39fe0355fa103c66385f0', Songs_Played=['SOCHADN12A6310ED94', 'SOCHADN12A6310ED94', 'SOCHADN12A6310ED94', 'SOCHADN12A6310ED94', 'SOCHADN12A6310ED94', 'SOXVVSM12A8C142224', 'SOQLUTQ12A8AE48037', 'SOWEJXA12A6701C574', 'SOWEJXA12A6701C574', 'SOWEJXA12A6701C574', 'SOWEJXA12A6701C574', 'SOWEJXA12A6701C574', 'SOIDFJR12A6D4F7EB0', 'SOIDFJR12A6D4F7EB0', 'SOIDFJR12A6D4F7EB0', 'SOIDFJR12A6D4F7EB0', 'SOIDFJR12A6D4F7EB0', 'SOIDFJR12A6D4F7EB0', 'SOIDFJR12A6D4F7EB0', 'SOVQVNI12A58A778B6', 'SOSIZFO12A58A79934', 'SOSIZFO12A58A79934', 'SORYRWS12AB0186CD9', 'SORYRWS12AB0186CD9', 'SOTIJUH12A58A7B37C', 'SOERYLG12A6701F07F', 'SOERYLG12A6701F07F', 'SOERYLG12A6701F07F', 'SOERYLG12A6701F07F', 'SOERYLG12A6701F07F', 'SOERYLG12A6701F07F', 'SOERYLG12A6701F07F', 'SOERYLG12A6701F07F', 'SOKLRPJ12A8C13C3FE', 'SOKLRPJ12A8C13C3FE', 'SOZVCRW12A67ADA0B7', 'SOLEEUJ12A6701C787', 'SOPQLBY12A6310E992', 'SOPQLBY12A6310E992', 'SOXAQJS12AB018CBF7', 'SOXAQJS12AB018CBF7', 'SOPXKYD12A6D4FA876', 'SOPXKYD12

In [25]:
### Using the Normalizer function we L2 normalized the entire â€˜tfidfâ€™ and stored it as â€˜dataâ€™. 

normalizer = Normalizer(inputCol="feature", outputCol="norm")
data = normalizer.transform(tfidf)
data.head(2)

[Row(ID='6530c4fc41b9110de5d39fe0355fa103c66385f0', Songs_Played=['SOCHADN12A6310ED94', 'SOCHADN12A6310ED94', 'SOCHADN12A6310ED94', 'SOCHADN12A6310ED94', 'SOCHADN12A6310ED94', 'SOXVVSM12A8C142224', 'SOQLUTQ12A8AE48037', 'SOWEJXA12A6701C574', 'SOWEJXA12A6701C574', 'SOWEJXA12A6701C574', 'SOWEJXA12A6701C574', 'SOWEJXA12A6701C574', 'SOIDFJR12A6D4F7EB0', 'SOIDFJR12A6D4F7EB0', 'SOIDFJR12A6D4F7EB0', 'SOIDFJR12A6D4F7EB0', 'SOIDFJR12A6D4F7EB0', 'SOIDFJR12A6D4F7EB0', 'SOIDFJR12A6D4F7EB0', 'SOVQVNI12A58A778B6', 'SOSIZFO12A58A79934', 'SOSIZFO12A58A79934', 'SORYRWS12AB0186CD9', 'SORYRWS12AB0186CD9', 'SOTIJUH12A58A7B37C', 'SOERYLG12A6701F07F', 'SOERYLG12A6701F07F', 'SOERYLG12A6701F07F', 'SOERYLG12A6701F07F', 'SOERYLG12A6701F07F', 'SOERYLG12A6701F07F', 'SOERYLG12A6701F07F', 'SOERYLG12A6701F07F', 'SOKLRPJ12A8C13C3FE', 'SOKLRPJ12A8C13C3FE', 'SOZVCRW12A67ADA0B7', 'SOLEEUJ12A6701C787', 'SOPQLBY12A6310E992', 'SOPQLBY12A6310E992', 'SOXAQJS12AB018CBF7', 'SOXAQJS12AB018CBF7', 'SOPXKYD12A6D4FA876', 'SOPXKYD12

In [26]:
### After the normalization, we proceeded to take the dot product of the two matrices. 
### We then calculate our results and store it in â€˜finaldfâ€™ DataFrame. 
### We also display our DataFrame as a table. 
### The table has three columns User1, User2 and Cosine Similarity. 
 
### Step 4 Part 1 is stored in the 'finaldf' Spark DataFrame
### We are showing all the pairs in descending order

dot_udf = psf.udf(lambda x,y: float(x.dot(y)), DoubleType())
finaldf=data.alias("i").join(data.alias("j"), psf.col("i.ID") < psf.col("j.ID"))\
    .select(
        psf.col("i.ID").alias("User 1"), 
        psf.col("j.ID").alias("User 2"), 
        dot_udf("i.norm", "j.norm").alias("Cosine Similarity")).sort(desc("Cosine Similarity"))

finaldf.show(1000)

+--------------------+--------------------+--------------------+
|              User 1|              User 2|   Cosine Similarity|
+--------------------+--------------------+--------------------+
|53f02466e42999daf...|eb3bfe7f67fb59d44...|  0.6664200920551404|
|00f7c493ee6488499...|eb3bfe7f67fb59d44...|  0.5739153330964452|
|00f7c493ee6488499...|a344d1f94e0f6d578...|  0.5545182603991983|
|62eb207ebff66e00a...|8c9f3dd16c120e68b...|  0.5523960920249202|
|00f7c493ee6488499...|53f02466e42999daf...|  0.5426874103603516|
|07260aa795b86382f...|bcb1e6d620cf52239...|  0.5295798772858136|
|3a151b699ce726e1c...|fbcafcd3521213763...|  0.5095908880845608|
|a344d1f94e0f6d578...|c732f882aa8d6db3b...| 0.49942772701886595|
|7b104f995943ca5f5...|9d6aec26a2b0e0f10...|  0.4384438636975087|
|b6fe63bd640050b92...|bcb1e6d620cf52239...|  0.4203524231588098|
|00f7c493ee6488499...|bcb1e6d620cf52239...| 0.40108770930159543|
|49bf7109eba57fa27...|c34670d9c1718361f...| 0.40097671623245884|
|00f7c493ee6488499...|c73

## Step 4. Part 2

In [27]:

### Step 4 Part 2.
### We are sorting the table based on Cosine Similarity. 
### We are displaying the top 5 pairs. 

finaldf.sort(desc("Cosine Similarity")).show(5)

+--------------------+--------------------+------------------+
|              User 1|              User 2| Cosine Similarity|
+--------------------+--------------------+------------------+
|53f02466e42999daf...|eb3bfe7f67fb59d44...|0.6664200920551404|
|00f7c493ee6488499...|eb3bfe7f67fb59d44...|0.5739153330964452|
|00f7c493ee6488499...|a344d1f94e0f6d578...|0.5545182603991983|
|62eb207ebff66e00a...|8c9f3dd16c120e68b...|0.5523960920249202|
|00f7c493ee6488499...|53f02466e42999daf...|0.5426874103603516|
+--------------------+--------------------+------------------+
only showing top 5 rows



## Step 4. Part 3

In [28]:
### Step 4 Part 3: If the top-5 user set has an user appearing more than once, 
### ignore that pair and take the next best pair from the sorted list.

### First we convert the DataFrame back to a RDD and store it in 'finaldfrdd' RDD.
### We then map it to a key-value RDD.
### Key: User1 column
### Value: User 2 and Cosine Similarity.

finaldfrdd = finaldf.rdd
finaldfrdd = finaldfrdd.map(lambda x: (x[0],(x[1],x[2])))

### Now we used reduce by key and added all the values. 
### We then select only the first value in the Reduced Value column using the map function.
### This is in order to eliminate the duplicate entries in User1 column.

### So now our RDD looks like:
### User1, (User2,Rating)

### Using the same map function we swap the User1 and User2 column for the next step. 

### The 'tempfinal' RDD finally looks like:
### User2, (User1,Rating)

tempfinal = finaldfrdd.reduceByKey(lambda x,y:x+y).map(lambda x: (x[1][0],(x[0],x[1][1])))
tempfinal.collect()

[('eb3bfe7f67fb59d4427cca56932a4d83caaaaa76',
  ('53f02466e42999daf6ab54b38a532b5435024f22', 0.6664200920551404)),
 ('bcb1e6d620cf522390d5c92bae26936928e0b588',
  ('3d28488a659d1cacc598d17074b233985271c65e', 0.304061936030354)),
 ('d8c425c171d9d67dc33240f605f6beed96e5e6ff',
  ('18ce1da0e1017e31baaa5f80afa64ee3c7fab379', 0.2922465023031923)),
 ('ef0d21935a2f8ae90571dbfab800f87fa5b38769',
  ('6e6b836e0a82b8d77865041d83f6b35bcc491f36', 0.22535044910511282)),
 ('8061f61372876878c2d67bc49b3ddbd2c83d69e2',
  ('0c65a060faaf2c3f9a6aed6c9732131709c33d55', 0.20766124966037033)),
 ('f89f240b78ca4c630952e95fc5333d5ecc821a74',
  ('ab35eaead21afe1f495add68a8b27dcb63573640', 0.20103793875363785)),
 ('acf60ae21bc96de4c3ec0649087aeef834bdc035',
  ('0c7dfda650da200c263522ca177e374a323584a9', 0.1831033398267445)),
 ('e4704b7e3ee04c9fb2164ba9dd4742d57c6c7542',
  ('55132cb6dd401983fbe2a8d70d7a8c6c74549f8c', 0.15747272058474718)),
 ('bcb1e6d620cf522390d5c92bae26936928e0b588',
  ('baf2fe5885ab93fbbdb7fecc669

In [29]:

### Once we have a unique User1 column we want to do the same to User2.
### We have already swapped User1 and User2 and have a key-value RDD. 
### Using the same commands, we find distinct User2 values. 
### We have saved this in 'userid2dis' RDD.

userid2dis = tempfinal.reduceByKey(lambda x,y:x+y).map(lambda x: (x[0],x[1][0],x[1][1]))
userid2dis.collect()

[('8061f61372876878c2d67bc49b3ddbd2c83d69e2',
  '0c65a060faaf2c3f9a6aed6c9732131709c33d55',
  0.20766124966037033),
 ('f89f240b78ca4c630952e95fc5333d5ecc821a74',
  'ab35eaead21afe1f495add68a8b27dcb63573640',
  0.20103793875363785),
 ('f3c6cc1430512dcb4dd143e88b39c21b30c46452',
  'e371f36df50e1d3dd95e58f4a9a37e4fdfc607f2',
  0.024558413383363724),
 ('baf2fe5885ab93fbbdb7fecc6691788e70afb6c8',
  'b7c655fc42586672170ee28be86ad970940497c1',
  0.0),
 ('daa9e7e53ae787ab4f1b5518b695198947d821a2',
  'd6d0c7632cf83f30c03f196fba1a157793fbaa96',
  0.0),
 ('ab35eaead21afe1f495add68a8b27dcb63573640',
  '041d75b5ec1dc50634cce4a19f6b29f61fc5d2bf',
  0.08644165826819437),
 ('6acb8f2e9996c43ae6cc1cbfe83662deca9347f9',
  '30e4a688e6fc9c8bfe55998af3996a909ae34449',
  0.011487121540904734),
 ('ff8891b901eed1a3abc9bd4c53a17d2bcdc0ead7',
  'fee1f31d583cb7e79b217661dc40f287bf65a13e',
  0.0),
 ('dd87b5cc43b72578298a6ffadfe8155c567f31b8',
  'd8c425c171d9d67dc33240f605f6beed96e5e6ff',
  0.3262175868126072),
 ('

In [30]:

### First we map to extract the values of the User2 column.
### Then save the 'user2list' RDD in a List.

user2list = userid2dis.map(lambda x:x[0]).collect()

In [31]:

### Now we check whether our user in the User1 column is unique from all users in the above list.
### We sort it by the Cosine Similarity column.
### This is saved in the 'finaluser' RDD.

finaluser = userid2dis.filter(lambda x:x[1] not in user2list).sortBy(lambda x:-float(x[2]))
finaluser.collect()

### We take only top-5 Cosine similarity pairs.
### Step 4 Part 3 Final Answer is stored in 'final43'

final43 = sc.parallelize(finaluser.take(5))
final43.collect()

[('bdbf8ddd82fa83ef4538a15298dfca19bfc4a3ca',
  '4782ec75607a6f2a93b7b104438e69a5e3f7626d',
  0.32204771497082496),
 ('9bafadb4f8e8b1011bd9af3984821822555b9916',
  '202c63cd3568680561e84d33bc35740d662efccf',
  0.30953372604591745),
 ('bcb1e6d620cf522390d5c92bae26936928e0b588',
  '3d28488a659d1cacc598d17074b233985271c65e',
  0.304061936030354),
 ('d8c425c171d9d67dc33240f605f6beed96e5e6ff',
  '18ce1da0e1017e31baaa5f80afa64ee3c7fab379',
  0.2922465023031923),
 ('d49742d767f7119ea6618c12fe432b660b129ebe',
  'a5d92e23cf3f711dfc473f1c3b296492ec02effd',
  0.2792408080977978)]

## Step 4. Part 4

In [32]:
### Step 4 Part 4 : For a given user_id, identify the top-5 similar users and 
### hence song recommendations from other user's list.


### First we convert the DataFrame back to a RDD and store it in finalrdd. 
### We map it and collect it for confirmation.
### This 'finalRDD' RDD is already in descinding order of the Cosine Similarity.

finalrdd=finaldf.rdd
finalrdd=finalrdd.map(lambda x: (x[0],x[1],x[2]))
finalrdd.collect()

[('53f02466e42999daf6ab54b38a532b5435024f22',
  'eb3bfe7f67fb59d4427cca56932a4d83caaaaa76',
  0.6664200920551404),
 ('00f7c493ee64884998ea98d9f5bed87bc4a0afcf',
  'eb3bfe7f67fb59d4427cca56932a4d83caaaaa76',
  0.5739153330964452),
 ('00f7c493ee64884998ea98d9f5bed87bc4a0afcf',
  'a344d1f94e0f6d5783860f62d8bc8ba2fec3d530',
  0.5545182603991983),
 ('62eb207ebff66e00a42ab80b06e0e3a2a2c13554',
  '8c9f3dd16c120e68bdd1dd34bfb7c5fd3d6dc487',
  0.5523960920249202),
 ('00f7c493ee64884998ea98d9f5bed87bc4a0afcf',
  '53f02466e42999daf6ab54b38a532b5435024f22',
  0.5426874103603516),
 ('07260aa795b86382fbddea37b7b8f6f7d5a1db0a',
  'bcb1e6d620cf522390d5c92bae26936928e0b588',
  0.5295798772858136),
 ('3a151b699ce726e1cff56a5bb069aec50efcba45',
  'fbcafcd35212137638b34ce4d78981b994e2aefa',
  0.5095908880845608),
 ('a344d1f94e0f6d5783860f62d8bc8ba2fec3d530',
  'c732f882aa8d6db3bfaf8037d6418f27d3e07fc8',
  0.49942772701886595),
 ('7b104f995943ca5f5df3d6b71aba712cd6e81632',
  '9d6aec26a2b0e0f10a58fe2ab64190

In [33]:
### We then make use of the given user id. 
### We find all the rows in the 'finalRDD' RDD where user id is either User1 or User2. 
### We store these rows in the 'givenusersimilar' RDD
### We only want the top-5 similar user therefore we extract the top 5 rows. 

givenuserid="6493c305190b52657d4ea3f4adf367ffcf3427af"

givenusersimilar=finalrdd.filter(lambda x: (x[0]==givenuserid or x[1]==givenuserid))
givenusersimilar=sc.parallelize(givenusersimilar.take(5))
givenusersimilar.collect()

[('6493c305190b52657d4ea3f4adf367ffcf3427af',
  'a344d1f94e0f6d5783860f62d8bc8ba2fec3d530',
  0.0353947816511668),
 ('6493c305190b52657d4ea3f4adf367ffcf3427af',
  'c732f882aa8d6db3bfaf8037d6418f27d3e07fc8',
  0.030606850597977533),
 ('00f7c493ee64884998ea98d9f5bed87bc4a0afcf',
  '6493c305190b52657d4ea3f4adf367ffcf3427af',
  0.025396128374460775),
 ('6493c305190b52657d4ea3f4adf367ffcf3427af',
  'a5d92e23cf3f711dfc473f1c3b296492ec02effd',
  0.018185699189073007),
 ('6493c305190b52657d4ea3f4adf367ffcf3427af',
  'd68dc6fc25248234590d7668a11e3335534ae4b4',
  0.015828864791743773)]

In [34]:
### If our given user is User1 column of our RDD, then we extract the User2 column and Cosine Similarity.
### We save this in 'n'

### If our given user is User2 column of our RDD, then we extract the User1 column and Cosine Similarity.
### We save this in 'm'

### We take a Union of 'n' and 'm' to get all the top-5 similar Users to our User and their Cosine Similarities.
### We save this in 'z' RDD.

n=givenusersimilar.filter(lambda x: x[0]==givenuserid).map(lambda x: (x[1],x[2]))
m=givenusersimilar.filter(lambda x: x[1]==givenuserid).map(lambda x: (x[0],x[2]))
z=n.union(m).map(lambda x: (x))
z.collect()

[('a344d1f94e0f6d5783860f62d8bc8ba2fec3d530', 0.0353947816511668),
 ('c732f882aa8d6db3bfaf8037d6418f27d3e07fc8', 0.030606850597977533),
 ('a5d92e23cf3f711dfc473f1c3b296492ec02effd', 0.018185699189073007),
 ('d68dc6fc25248234590d7668a11e3335534ae4b4', 0.015828864791743773),
 ('00f7c493ee64884998ea98d9f5bed87bc4a0afcf', 0.025396128374460775)]

In [35]:

### We got our original triplet_rdd RDD which has the Users, Songs, and Times Played
### As done in Step 3, we map the triplet_rdd to get User Name, Song Name and Rating.
### Stored that in 'finalee'.

### We then create a key value pair of 'finalee' 
### Key: User Name
### Value: Song Name and Rating

### We join it with 'z' RDD to get only the top-5 similar Users along with all their Songs and all their Ratings.
### We save this in final4

finalee = triplet_rdd.map(lambda x: (x[0],x[1],(int(x[2])/ssum)*100000))
finalee=finalee.map(lambda x: (x[0],(x[1],x[2])))
final4=finalee.join(z)
final4.collect()

[('d68dc6fc25248234590d7668a11e3335534ae4b4',
  (('SOFRQTD12A81C233C0', 0.021624707525830716), 0.015828864791743773)),
 ('d68dc6fc25248234590d7668a11e3335534ae4b4',
  (('SOZQIUZ12A8C13CFBE', 0.021624707525830716), 0.015828864791743773)),
 ('d68dc6fc25248234590d7668a11e3335534ae4b4',
  (('SOKQNYH12A6D4FA5D3', 0.021624707525830716), 0.015828864791743773)),
 ('d68dc6fc25248234590d7668a11e3335534ae4b4',
  (('SOQDMED12A67ADE731', 0.021624707525830716), 0.015828864791743773)),
 ('d68dc6fc25248234590d7668a11e3335534ae4b4',
  (('SOAXGDH12A8C13F8A1', 0.021624707525830716), 0.015828864791743773)),
 ('d68dc6fc25248234590d7668a11e3335534ae4b4',
  (('SOAAGFH12A8C13D072', 0.021624707525830716), 0.015828864791743773)),
 ('a344d1f94e0f6d5783860f62d8bc8ba2fec3d530',
  (('SOFRQTD12A81C233C0', 0.10812353762915357), 0.0353947816511668)),
 ('a344d1f94e0f6d5783860f62d8bc8ba2fec3d530',
  (('SOIEBXT12A8C1341E1', 0.021624707525830716), 0.0353947816511668)),
 ('a344d1f94e0f6d5783860f62d8bc8ba2fec3d530',
  (('SO

In [36]:

### We map 'final4' to extract only the Song Name recommendation from other user's list and their Rating
### We then sort it by descending order.
### We take the top 5 into a new RDD called 'final44' and display it. 

### Final answer of Step 4 Part 4 is stored in 'final44' RDD.

final4=final4.map(lambda x: (x[1][0][0],x[1][0][1])).sortBy(lambda x: -x[1])
final44 = sc.parallelize(final4.take(5))
final44.collect()

[('SOETDMS12AB017F8E7', 0.19462236773247643),
 ('SONNBUM12A6D4FDDB4', 0.17299766020664573),
 ('SOFRQTD12A81C233C0', 0.151372952680815),
 ('SOFRQTD12A81C233C0', 0.10812353762915357),
 ('SOVDSJC12A58A7A271', 0.10812353762915357)]

# Fin.