### Gina Huh, Whitt Hyde, David Kinman

# Project - 1 : Apache Spark 

Instructor: Ramesh Yerraballi
TA: Madhumitha Sakthi
Semester: Fall 2019
Due Date: 11:59pm, Monday 9/16

This project is based on Map-Reduce Framework. In these you will get to work with Spark and will get to know how 
does spark work, what functionalities does spark provide, what does map-reduce framework do and why is it useful. 

In this project you will be implementing a basic song recommender system. You will be given a dataset where there are multiple csv files. These csv files have data corresponding to song play count and song information.

The data you would be using will be provided in a zip file along with this notebook. The __msd.zip__ archive contains '_kaggle_visible_evaluation_triplets.txt_'. We will be using the visible part of the testing data to understand the working on Apache Spark.  The user's listening history is provided as: (user, song, play count). In another file, '_kaggle_songs.txt_', each song is marked using an index for easier representation of songs. 



What to turn in:
A zip folder which will have:
- Jupyter Notebook
- A brief report on what features you used for recommendation. And a brief explanation of flow of your code. For example,  what RDD does what or, why it was created.
- datasets folder with the csv files you are using in your notebook.
- Notebook should use relative path to the csv files in datasets folder.
- Name of the zip folder - <your\_name>\_<your\_partner_name>.zip


This project consists of 4 questions. 
1. Create an RDD with _msd_evalutation_triplets.txt_ and replace the song name with the song index from _msd_songs.txt_. Identify the number of songs that do not have any rating. 
2. Generate song ratings based on the song play count as a normalized score between 0 and 1. 
3. Identify the popular song based on this rating and recommend songs to user, given user id based on the algorithm used in Movie recommender system from class. 
4. Using Cosine similarity function, identify pair-wise similarity between each pair of users and generate the top 5 most similar users without an overlap in users. 

The above list is the higer level idea about the questions. 

In [1]:
### Starter code ####

##### These lines are to tell jupyter where to find Apache Spark ####
import findspark
findspark.init('C:\\apachespark')
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("Songs")
# There are two configurable parameters
# 1. A cluster URL, namely  local  in this example, which tells Spark how to connect
# to a cluster.  local  is a special value that runs Spark on one thread on the local
# machine, without connecting to a cluster.
# 2. An application name, namely  Movies  in this example. This will identify your
# application on the cluster manager’s UI if you connect to a cluster.
sc = SparkContext(conf = conf)
##### These lines are to tell jupyter where to find Apache Spark ####

In [2]:
## Read triplet file into RDD
triplet_rdd = sc.textFile(r"kaggle_visible_evaluation_triplets.txt") \
    .map(lambda line: line.split("\t")) 

triplet_rdd.take(1)

[['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOBONKR12A58A7A7E0', '1']]

In [3]:
## Read song file into RDD
songs_rdd = sc.textFile(r"kaggle_songs.txt") \
    .map(lambda line: line.split(" ")) 

songs_rdd.take(1)

[['SOAAADD12AB018A9DD', '1']]

## Step 1: 
Replace song name with song index and identify the number of songs without user history

In [4]:
## Bring the Song name as the first element, make the rest into a list
reordered_triplet_rdd = triplet_rdd.map(lambda x : (x[1], [x[0], x[2]]))
reordered_triplet_rdd.take(3)

[('SOBONKR12A58A7A7E0', ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1']),
 ('SOEGIYH12A6D4FC0E3', ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1']),
 ('SOFLJQZ12A6D4FADA6', ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1'])]

In [5]:
## Convert songs_rdd into a list of tuples
songs_tuple_rdd = songs_rdd.map(lambda x : (x[0], x[1]))
songs_tuple_rdd.take(1)

[('SOAAADD12AB018A9DD', '1')]

In [6]:
## Join songs_tuple_rdd & reordered_triplet_rdd -> make sure the left table is songs_tuple_rdd!!
joined_rdd = songs_tuple_rdd.leftOuterJoin(reordered_triplet_rdd)
joined_rdd.take(3)

[('SOAAALJ12AB01828B4', ('12', None)),
 ('SOAAAQN12AB01856D3',
  ('16', ['32cf63cf65787ce7e72fc7fda6ee585979af6582', '1'])),
 ('SOAAAQN12AB01856D3',
  ('16', ['d1b88940eabd4fab860edf68f0f0842e4f902c78', '1']))]

In [7]:
## Get rid of the first index x[0] (song name) and replace it with song index (id)
replaced_rdd = joined_rdd.map(lambda x : (x[1][0], x[1][1]))
replaced_rdd.take(2)

[('12', None), ('16', ['32cf63cf65787ce7e72fc7fda6ee585979af6582', '1'])]

In [8]:
## Filter for songs that do not have user rating
no_rating_rdd = replaced_rdd.filter(lambda x : x[1] is None )
print("There are %d songs that do not have any ratings." % (no_rating_rdd.count()))

There are 223007 songs that do not have any ratings.


## Step 2:
Generate song ratings based on the play_count. For example, if (song_1, 5; song_2, 10; song_3, 5) i.e., song_1 is played 5 times, song_2 is played 10 times and song_3 is played 5 times, the normalized rating score should be 0.25, 0.5 and 0.25 respectively. 
Similarly, generate the rating for all the songs. You may notice that based on all songs, the rating is almost always very low. So, think of the best way to convert song count to ratings. (Hint: Try generating ratings based on each user's song play history)

In [9]:
from operator import add

In [10]:
## Read user file into RDD
users_rdd = sc.textFile(r"kaggle_users.txt") \
    .map(lambda line: line.split("\n")) 

users_rdd.take(1)

[['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d']]

In [11]:
## Make user ID the key for triplet RDD
triplet_user_rdd = triplet_rdd.map(lambda x : (x[0], int(x[2])))
triplet_user_rdd.take(1)

[('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 1)]

In [12]:
## Reduce By Key to add the number of plays (x[2]), for each user
sum_rdd = triplet_user_rdd.reduceByKey(add)
sum_rdd.take(3)

[('d7083f5e1d50c264277d624340edaaf3dc16095b', 17),
 ('d68dc6fc25248234590d7668a11e3335534ae4b4', 6),
 ('fdf6afb5daefb42774617cf223475c6013969724', 10)]

In [13]:
## Get rid of rows that do not have a play count
no_none_rdd = replaced_rdd.filter(lambda x : x[1] is not None )
no_none_rdd.take(2)

[('16', ['32cf63cf65787ce7e72fc7fda6ee585979af6582', '1']),
 ('16', ['d1b88940eabd4fab860edf68f0f0842e4f902c78', '1'])]

In [14]:
## Reorder to make username the key
user_key_rdd = no_none_rdd.map(lambda x : (x[1][0], (x[0], x[1][1])))
user_key_rdd.take(2)

[('32cf63cf65787ce7e72fc7fda6ee585979af6582', ('16', '1')),
 ('d1b88940eabd4fab860edf68f0f0842e4f902c78', ('16', '1'))]

In [15]:
## Join sum_rdd with user_key_rdd 
joined_user_sum = sum_rdd.leftOuterJoin(user_key_rdd)
joined_user_sum.take(2)
## (username, (number of times that user played songs, (song ID, number of times that song was played)))

[('6530c4fc41b9110de5d39fe0355fa103c66385f0', (44, ('36681', '5'))),
 ('6530c4fc41b9110de5d39fe0355fa103c66385f0', (44, ('75497', '8')))]

In [16]:
## Divide # of times that song was played by total number of times a user played all songs
rating_rdd = joined_user_sum.map(lambda x : (x[0], x[1][1][0], int(x[1][1][1])/int(x[1][0])))
rating_rdd.take(2)

[('6530c4fc41b9110de5d39fe0355fa103c66385f0', '36681', 0.11363636363636363),
 ('6530c4fc41b9110de5d39fe0355fa103c66385f0', '75497', 0.18181818181818182)]

In [17]:
## Normalize the ratings
import math
normalized_rdd = rating_rdd.map(lambda x : (x[0], x[1], 10+math.log(x[2])))

normalized_rdd.take(5)

[('6530c4fc41b9110de5d39fe0355fa103c66385f0', '36681', 7.825248278515839),
 ('6530c4fc41b9110de5d39fe0355fa103c66385f0', '75497', 8.295251907761575),
 ('6530c4fc41b9110de5d39fe0355fa103c66385f0', '245936', 7.3144226547498485),
 ('6530c4fc41b9110de5d39fe0355fa103c66385f0', '383719', 6.2158103660817385),
 ('6530c4fc41b9110de5d39fe0355fa103c66385f0', '129546', 8.161720515137052)]

## Step 3: 
For a given user_id, rating, recommend 5 other songs from the list. One way to do this is based on another user who liked the same song liked by this user with rating more than the given rating and recommend the 5 songs based on the matched user's rating. 

In [18]:
## Inputs
givenUserId = '6530c4fc41b9110de5d39fe0355fa103c66385f0';
givenSongId = '36681';
givenRating = 7.825248278515839;

In [19]:
## Reformat normalized_rdd
reformatted_rdd = normalized_rdd.map(lambda x : (x[0], [x[1], x[2]]))
reformatted_rdd.take(2)

[('6530c4fc41b9110de5d39fe0355fa103c66385f0', ['36681', 7.825248278515839]),
 ('6530c4fc41b9110de5d39fe0355fa103c66385f0', ['75497', 8.295251907761575])]

In [20]:
## Function to find users that liked the same song more than the given user
def liked(col):
    if ((col[0] != '') and (col[1][0] == givenSongId) and (float(col[1][1]) > givenRating)): #does the movie id match, rating higher?
        return True
    return False

## We just need the users that have liked that song more than given user
matched_users_rdd = reformatted_rdd.filter(liked)
matched_users_rdd.take(2)

## These are the users that we are focusing on - these users like the song more than given user

[('6385fa76b90e95f49909701b5effcdc17ea3bc0a', ['36681', 8.208240530771945]),
 ('af71c172969f8918b58b7d160177e5e332332bc5', ['36681', 8.3905620875659])]

In [21]:
## Get rid of song idex - same song
just_users_rdd = matched_users_rdd.map(lambda x : (x[0], x[1][1]) )
just_users_rdd.take(2)

[('6385fa76b90e95f49909701b5effcdc17ea3bc0a', 8.208240530771945),
 ('af71c172969f8918b58b7d160177e5e332332bc5', 8.3905620875659)]

In [22]:
## Find all the songs that the users have listened to by joining
## Inner join between reformatted_rdd and this user list - the user name needs to match with the just_users_rdd
## Get other songs that that user has listened to!
potential_songs = reformatted_rdd.join(just_users_rdd)
potential_songs.take(2)

[('6385fa76b90e95f49909701b5effcdc17ea3bc0a',
  (['36681', 8.208240530771945], 8.208240530771945)),
 ('6385fa76b90e95f49909701b5effcdc17ea3bc0a',
  (['288097', 8.208240530771945], 8.208240530771945))]

In [23]:
## Reformat the potential songs
reformatted_potential_rdd = potential_songs.map(lambda x : (x[1][0][0], [x[0], x[1][0][1]]))
reformatted_potential_rdd.take(2)

[('36681', ['6385fa76b90e95f49909701b5effcdc17ea3bc0a', 8.208240530771945]),
 ('288097', ['6385fa76b90e95f49909701b5effcdc17ea3bc0a', 8.208240530771945])]

In [24]:
## Function to exclude the same song as given song, because right now in our list there is also our given song
def notTheGivenSong(row):
    if(row[0] != givenSongId):
        return True
    return False

songs_without_given = reformatted_potential_rdd.filter(notTheGivenSong)
songs_without_given.take(2)

[('288097', ['6385fa76b90e95f49909701b5effcdc17ea3bc0a', 8.208240530771945]),
 ('385781', ['6385fa76b90e95f49909701b5effcdc17ea3bc0a', 8.208240530771945])]

In [25]:
## Drop the users from RDD
songs_without_users = songs_without_given.map(lambda x : (x[0], float(x[1][1])))
songs_without_users.takeOrdered(5, key=lambda x: -x[1])

[('217663', 9.393864196429684),
 ('248603', 9.21154263963573),
 ('240580', 9.18906978378367),
 ('343755', 9.152702139612796),
 ('302439', 9.083709268125846)]

In [26]:
## Aggregate the ratings for each song, and take the top 5 songs with the highest rating
songs_aggregated = songs_without_users.reduceByKey(add) 
songs_aggregated.takeOrdered(5, key=lambda x: -x[1])

[('328524', 23.75002475774052),
 ('215861', 15.030186700423998),
 ('286747', 14.96304739758637),
 ('36622', 14.36521039683075),
 ('242113', 13.750990474860476)]

In [27]:
## Find the song names that match the top 5 song indexes
## Reformat our songs_tuple to have the song index as key
songs_tuple_reordered = songs_tuple_rdd.map(lambda x : (x[1], x[0]))
songs_tuple_reordered.take(2)

[('1', 'SOAAADD12AB018A9DD'), ('2', 'SOAAADE12A6D4F80CC')]

In [28]:
## Join the song_tuple with our top 5 songs rdd
song_names = songs_aggregated.join(songs_tuple_reordered)
song_names.take(2)

[('281084', (8.3905620875659, 'SOSJRXV12A8C136E1B')),
 ('123485', (7.227411277760218, 'SOHTEDD12A6D4F8215'))]

In [29]:
## Reformat song_names to drop song index, and include just the song rating
song_names_reformatted = song_names.map(lambda x : (x[1][1], x[1][0]))
song_names_reformatted.take(2)

[('SOSJRXV12A8C136E1B', 8.3905620875659),
 ('SOHTEDD12A6D4F8215', 7.227411277760218)]

In [30]:
## Take the top five from the song names list
song_names_reformatted.takeOrdered(5, key=lambda x : -x[1])

[('SOVTLQW12AB0186641', 23.75002475774052),
 ('SONVPTP12A6D4F7A34', 15.030186700423998),
 ('SOSUADS12A58A80A47', 14.96304739758637),
 ('SOCGXXL12B0B808865', 14.36521039683075),
 ('SOPQJAZ12A6310F168', 13.750990474860476)]

## Step 4: 
1. Compute cosine similarity between all pairs of users. 
2. Sort the similarity score and print the top-5 similar users. 
3. If the top-5 user set has an user appearing more than once, ignore that pair and take the next best pair from the sorted list. 
4. For a given user_id, identify the top-5 similar users and hence song recommendations from other user's list. 

In [31]:
usergrouped = normalized_rdd.map(lambda x: (x[0],[x[1],x[2]]))
userratings = usergrouped.groupByKey().map(lambda x: (x[0],list(x[1])))
userratings.collect()

[('6530c4fc41b9110de5d39fe0355fa103c66385f0',
  [['36681', 7.825248278515839],
   ['75497', 8.295251907761575],
   ['245936', 7.3144226547498485],
   ['383719', 6.2158103660817385],
   ['129546', 8.161720515137052],
   ['253868', 6.2158103660817385],
   ['280679', 6.908957546641684],
   ['294531', 6.2158103660817385],
   ['357202', 6.2158103660817385],
   ['346138', 6.908957546641684],
   ['165401', 6.908957546641684],
   ['176139', 6.2158103660817385],
   ['275127', 6.908957546641684],
   ['242151', 6.908957546641684],
   ['327050', 6.2158103660817385],
   ['334240', 7.825248278515839]]),
 ('5a68f7886f7e778490c6f13807039ff4152bcd62',
  [['340434', 8.295251907761575],
   ['299176', 7.602104727201629],
   ['93787', 7.602104727201629],
   ['318282', 8.98839908832152],
   ['180600', 7.602104727201629],
   ['286878', 8.295251907761575]]),
 ('6493c305190b52657d4ea3f4adf367ffcf3427af',
  [['37903', 5.4782114229509595],
   ['65932', 5.4782114229509595],
   ['117573', 5.4782114229509595],
   [

In [32]:
dataprepRDD = userratings.map(lambda x: (1,x))
dataprepRDD.take(3)

[(1,
  ('6530c4fc41b9110de5d39fe0355fa103c66385f0',
   [['36681', 7.825248278515839],
    ['75497', 8.295251907761575],
    ['245936', 7.3144226547498485],
    ['383719', 6.2158103660817385],
    ['129546', 8.161720515137052],
    ['253868', 6.2158103660817385],
    ['280679', 6.908957546641684],
    ['294531', 6.2158103660817385],
    ['357202', 6.2158103660817385],
    ['346138', 6.908957546641684],
    ['165401', 6.908957546641684],
    ['176139', 6.2158103660817385],
    ['275127', 6.908957546641684],
    ['242151', 6.908957546641684],
    ['327050', 6.2158103660817385],
    ['334240', 7.825248278515839]])),
 (1,
  ('5a68f7886f7e778490c6f13807039ff4152bcd62',
   [['340434', 8.295251907761575],
    ['299176', 7.602104727201629],
    ['93787', 7.602104727201629],
    ['318282', 8.98839908832152],
    ['180600', 7.602104727201629],
    ['286878', 8.295251907761575]])),
 (1,
  ('6493c305190b52657d4ea3f4adf367ffcf3427af',
   [['37903', 5.4782114229509595],
    ['65932', 5.47821142295095

In [33]:
def theGivenUser(row):
    if(row[0]==givenUserId):
        return True
    return False

joinA = userratings.filter(theGivenUser).map(lambda x: (1, x))
joinA.collect()

[(1,
  ('6530c4fc41b9110de5d39fe0355fa103c66385f0',
   [['36681', 7.825248278515839],
    ['75497', 8.295251907761575],
    ['245936', 7.3144226547498485],
    ['383719', 6.2158103660817385],
    ['129546', 8.161720515137052],
    ['253868', 6.2158103660817385],
    ['280679', 6.908957546641684],
    ['294531', 6.2158103660817385],
    ['357202', 6.2158103660817385],
    ['346138', 6.908957546641684],
    ['165401', 6.908957546641684],
    ['176139', 6.2158103660817385],
    ['275127', 6.908957546641684],
    ['242151', 6.908957546641684],
    ['327050', 6.2158103660817385],
    ['334240', 7.825248278515839]]))]

In [34]:
joined = dataprepRDD.join(joinA)
joined.collect()

[(1,
  (('6530c4fc41b9110de5d39fe0355fa103c66385f0',
    [['36681', 7.825248278515839],
     ['75497', 8.295251907761575],
     ['245936', 7.3144226547498485],
     ['383719', 6.2158103660817385],
     ['129546', 8.161720515137052],
     ['253868', 6.2158103660817385],
     ['280679', 6.908957546641684],
     ['294531', 6.2158103660817385],
     ['357202', 6.2158103660817385],
     ['346138', 6.908957546641684],
     ['165401', 6.908957546641684],
     ['176139', 6.2158103660817385],
     ['275127', 6.908957546641684],
     ['242151', 6.908957546641684],
     ['327050', 6.2158103660817385],
     ['334240', 7.825248278515839]]),
   ('6530c4fc41b9110de5d39fe0355fa103c66385f0',
    [['36681', 7.825248278515839],
     ['75497', 8.295251907761575],
     ['245936', 7.3144226547498485],
     ['383719', 6.2158103660817385],
     ['129546', 8.161720515137052],
     ['253868', 6.2158103660817385],
     ['280679', 6.908957546641684],
     ['294531', 6.2158103660817385],
     ['357202', 6.21581036

In [35]:
def combineRatings(row):
    list1 = row[1][0][1]
    list2 = row[1][1][1]
    
    dict1 = {}
    for song in list1:
        dict1[song[0]] = []
        dict1[song[0]].append(song[1])
        dict1[song[0]].append(0)
    
    for song in list2:
        if song[0] not in dict1:
            dict1[song[0]] = []
            dict1[song[0]].append(0)
            dict1[song[0]].append(song[1])
        else:
            dict1[song[0]][1] = song[1]
    
    return (row[1][0][0], dict1)

joined2 = joined.map(lambda row: combineRatings(row))
joined2.collect()

[('6530c4fc41b9110de5d39fe0355fa103c66385f0',
  {'36681': [7.825248278515839, 7.825248278515839],
   '75497': [8.295251907761575, 8.295251907761575],
   '245936': [7.3144226547498485, 7.3144226547498485],
   '383719': [6.2158103660817385, 6.2158103660817385],
   '129546': [8.161720515137052, 8.161720515137052],
   '253868': [6.2158103660817385, 6.2158103660817385],
   '280679': [6.908957546641684, 6.908957546641684],
   '294531': [6.2158103660817385, 6.2158103660817385],
   '357202': [6.2158103660817385, 6.2158103660817385],
   '346138': [6.908957546641684, 6.908957546641684],
   '165401': [6.908957546641684, 6.908957546641684],
   '176139': [6.2158103660817385, 6.2158103660817385],
   '275127': [6.908957546641684, 6.908957546641684],
   '242151': [6.908957546641684, 6.908957546641684],
   '327050': [6.2158103660817385, 6.2158103660817385],
   '334240': [7.825248278515839, 7.825248278515839]}),
 ('5a68f7886f7e778490c6f13807039ff4152bcd62',
  {'340434': [8.295251907761575, 0],
   '29917

In [36]:
def filterOutZeroCos(row):
    ratingsDict = row[1]
    for value in ratingsDict.values():
        if (value[0] != 0 and value[1] != 0):
            return True
    return False

filteredPairs = joined2.filter(filterOutZeroCos)
filteredPairs.collect()

[('6530c4fc41b9110de5d39fe0355fa103c66385f0',
  {'36681': [7.825248278515839, 7.825248278515839],
   '75497': [8.295251907761575, 8.295251907761575],
   '245936': [7.3144226547498485, 7.3144226547498485],
   '383719': [6.2158103660817385, 6.2158103660817385],
   '129546': [8.161720515137052, 8.161720515137052],
   '253868': [6.2158103660817385, 6.2158103660817385],
   '280679': [6.908957546641684, 6.908957546641684],
   '294531': [6.2158103660817385, 6.2158103660817385],
   '357202': [6.2158103660817385, 6.2158103660817385],
   '346138': [6.908957546641684, 6.908957546641684],
   '165401': [6.908957546641684, 6.908957546641684],
   '176139': [6.2158103660817385, 6.2158103660817385],
   '275127': [6.908957546641684, 6.908957546641684],
   '242151': [6.908957546641684, 6.908957546641684],
   '327050': [6.2158103660817385, 6.2158103660817385],
   '334240': [7.825248278515839, 7.825248278515839]}),
 ('8d3d5864ec55e1bb0438436e39d8eba4b4043e79',
  {'177810': [6.029708086447878, 0],
   '37145

In [37]:
def separateRatingLists(row):
    list1 = []
    list2 = []
    
    for pair in row[1].values():
        list1.append(pair[0])
        list2.append(pair[1])
    return (row[0], list1, list2)

filteredRDD = filteredPairs.map(lambda x: separateRatingLists(x))
filteredRDD.collect()

[('6530c4fc41b9110de5d39fe0355fa103c66385f0',
  [7.825248278515839,
   8.295251907761575,
   7.3144226547498485,
   6.2158103660817385,
   8.161720515137052,
   6.2158103660817385,
   6.908957546641684,
   6.2158103660817385,
   6.2158103660817385,
   6.908957546641684,
   6.908957546641684,
   6.2158103660817385,
   6.908957546641684,
   6.908957546641684,
   6.2158103660817385,
   7.825248278515839],
  [7.825248278515839,
   8.295251907761575,
   7.3144226547498485,
   6.2158103660817385,
   8.161720515137052,
   6.2158103660817385,
   6.908957546641684,
   6.2158103660817385,
   6.2158103660817385,
   6.908957546641684,
   6.908957546641684,
   6.2158103660817385,
   6.908957546641684,
   6.908957546641684,
   6.2158103660817385,
   7.825248278515839]),
 ('8d3d5864ec55e1bb0438436e39d8eba4b4043e79',
  [6.029708086447878,
   7.975618235503191,
   8.514614736235878,
   7.1283203751159885,
   6.029708086447878,
   7.1283203751159885,
   6.029708086447878,
   6.029708086447878,
   6.0297

In [38]:
from sklearn.metrics.pairwise import cosine_similarity

def cosinSimilarity(x):
    cosine = cosine_similarity([x[1],x[2]])
    return cosine[0][1]

def notTheUser(y):
    if(y[0] != givenUserId):
        return True
    return False

cosRDD = filteredRDD.map(lambda row: (row[0], cosinSimilarity(row))).filter(notTheUser)
cosRDD.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 134.0 failed 1 times, most recent failure: Lost task 4.0 in stage 134.0 (TID 252, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\apachespark\python\lib\pyspark.zip\pyspark\worker.py", line 364, in main
  File "C:\apachespark\python\lib\pyspark.zip\pyspark\worker.py", line 69, in read_command
  File "C:\apachespark\python\lib\pyspark.zip\pyspark\serializers.py", line 172, in _read_with_length
    return self.loads(obj)
  File "C:\apachespark\python\lib\pyspark.zip\pyspark\serializers.py", line 580, in loads
    return pickle.loads(obj, encoding=encoding)
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\sklearn\metrics\__init__.py", line 7, in <module>
    from .ranking import auc
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\sklearn\metrics\ranking.py", line 35, in <module>
    from ..preprocessing import label_binarize
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\sklearn\preprocessing\__init__.py", line 6, in <module>
    from ._function_transformer import FunctionTransformer
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\sklearn\preprocessing\_function_transformer.py", line 5, in <module>
    from ..utils.testing import assert_allclose_dense_sparse
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\sklearn\utils\testing.py", line 718, in <module>
    import pytest
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\pytest.py", line 6, in <module>
    from _pytest.assertion import register_assert_rewrite
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\_pytest\assertion\__init__.py", line 6, in <module>
    from _pytest.assertion import rewrite
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\_pytest\assertion\rewrite.py", line 20, in <module>
    from _pytest.assertion import util
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\_pytest\assertion\util.py", line 5, in <module>
    import _pytest._code
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\_pytest\_code\__init__.py", line 2, in <module>
    from .code import Code  # noqa
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\_pytest\_code\code.py", line 11, in <module>
    import pluggy
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\pluggy\__init__.py", line 16, in <module>
    from .manager import PluginManager, PluginValidationError
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\pluggy\manager.py", line 6, in <module>
    import importlib_metadata
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 466, in <module>
    __version__ = version(__name__)
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 433, in version
    return distribution(package).version
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 406, in distribution
    return Distribution.from_name(package)
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 176, in from_name
    dist = next(dists, None)
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 362, in <genexpr>
    for path in map(cls._switch_path, paths)
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 377, in _search_path
    if not root.is_dir():
  File "C:\Users\Jeeyoung\Anaconda3\lib\pathlib.py", line 1351, in is_dir
    return S_ISDIR(self.stat().st_mode)
  File "C:\Users\Jeeyoung\Anaconda3\lib\pathlib.py", line 1161, in stat
    return self._accessor.stat(self)
OSError: [WinError 123] The filename, directory name, or volume label syntax is incorrect: 'C:\\C:\\apachespark\\jars\\spark-core_2.11-2.4.4.jar'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\apachespark\python\lib\pyspark.zip\pyspark\worker.py", line 364, in main
  File "C:\apachespark\python\lib\pyspark.zip\pyspark\worker.py", line 69, in read_command
  File "C:\apachespark\python\lib\pyspark.zip\pyspark\serializers.py", line 172, in _read_with_length
    return self.loads(obj)
  File "C:\apachespark\python\lib\pyspark.zip\pyspark\serializers.py", line 580, in loads
    return pickle.loads(obj, encoding=encoding)
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\sklearn\metrics\__init__.py", line 7, in <module>
    from .ranking import auc
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\sklearn\metrics\ranking.py", line 35, in <module>
    from ..preprocessing import label_binarize
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\sklearn\preprocessing\__init__.py", line 6, in <module>
    from ._function_transformer import FunctionTransformer
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\sklearn\preprocessing\_function_transformer.py", line 5, in <module>
    from ..utils.testing import assert_allclose_dense_sparse
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\sklearn\utils\testing.py", line 718, in <module>
    import pytest
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\pytest.py", line 6, in <module>
    from _pytest.assertion import register_assert_rewrite
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\_pytest\assertion\__init__.py", line 6, in <module>
    from _pytest.assertion import rewrite
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\_pytest\assertion\rewrite.py", line 20, in <module>
    from _pytest.assertion import util
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\_pytest\assertion\util.py", line 5, in <module>
    import _pytest._code
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\_pytest\_code\__init__.py", line 2, in <module>
    from .code import Code  # noqa
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\_pytest\_code\code.py", line 11, in <module>
    import pluggy
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\pluggy\__init__.py", line 16, in <module>
    from .manager import PluginManager, PluginValidationError
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\pluggy\manager.py", line 6, in <module>
    import importlib_metadata
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 466, in <module>
    __version__ = version(__name__)
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 433, in version
    return distribution(package).version
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 406, in distribution
    return Distribution.from_name(package)
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 176, in from_name
    dist = next(dists, None)
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 362, in <genexpr>
    for path in map(cls._switch_path, paths)
  File "C:\Users\Jeeyoung\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 377, in _search_path
    if not root.is_dir():
  File "C:\Users\Jeeyoung\Anaconda3\lib\pathlib.py", line 1351, in is_dir
    return S_ISDIR(self.stat().st_mode)
  File "C:\Users\Jeeyoung\Anaconda3\lib\pathlib.py", line 1161, in stat
    return self._accessor.stat(self)
OSError: [WinError 123] The filename, directory name, or volume label syntax is incorrect: 'C:\\C:\\apachespark\\jars\\spark-core_2.11-2.4.4.jar'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
from sklearn.metrics.pairwise import cosine_similarity

a = cosine_similarity([[1,2,3,4],[2,3,4,5]])
print(a)

In [None]:
topCosines = cosRDD.takeOrdered(5, key = lambda x: -x[1])
topCosines

In [None]:
## otherUsers_normalized_ratings is ('customername',('songID',rating)),
otherUsers_normalized_ratings = normalized_rdd.map(lambda x: (x[0],(x[1],x[2])))
otherUsers_normalized_ratings.take(3)

In [None]:
topCosinesRDD = sc.parallelize(topCosines)
songsOfTop5 = topCosinesRDD.join(otherUsers_normalized_ratings).map(lambda x: (x[0], x[1][1]))
## otherUsers_normalized_ratings is ('customername',('songID',rating)),


songsOfTop5.collect()

In [None]:
justTheSongs = songsOfTop5.map(lambda x: x[1])
justTheSongs.collect()

In [None]:
aggregatedSongs1 = justTheSongs.groupByKey().map(lambda x: (x[0], list(x[1])))
aggregatedSongs2 = aggregatedSongs1.map(lambda x: (x[0], (sum(x[1])/len(x[1]))))
aggregatedSongs2.collect()

In [None]:
tempTest = justTheSongs.groupByKey()
tempTest.take(2)

In [None]:
top5Songs = aggregatedSongs2.takeOrdered(5, key = lambda x: -x[1])
top5Songs