# <p style="text-align: center;">MIS 285N: Big Data and Distributed Programming</p>
# <p style="text-align: center;">Project - 1 : Apache Spark</p>
## <p style="text-align: center;">Instructor: Dr. Ramesh Yerraballi</p>
## <p style="text-align: center;">Due: Tuesday, September 14th submitted via Canvas by 11:59 pm</p>

Your work should be written in a **Jupyter notebook**.   

Also, please make sure your code runs in your notebook before submitting.

**Note:**

This project is based on Map-Reduce Framework. In these you will get to work with Spark and will get to know how 
does spark work, what functionalities does spark provide, what does map-reduce framework do and why is it useful. 

In this project you will be implementing a basic song recommender system. You will be given a dataset where there are multiple csv files. These csv files have data corresponding to song play count and song information.

The data you will use is provided in a zip file along with this notebook. The __msd.zip__ archive contains:
1. **'kaggle_visible_evaluation_triplets.txt'**. We will be using the visible part of the testing data to understand the working on Apache Spark.  The user's listening history is provided as: (user, song, play count).  
2. In **'kaggle_songs.txt'** file, each song is marked using an index for easier representation of songs.  
3. And **'kaggle_users.txt'** file is the canonical list of user identifiers.
4. Take **'MSDChallengeGettingstarted.pdf'** as your reference.



### **What to turn in?**  

A zip folder which will have:
1. Jupyter Notebook
2. A brief report in PDF format on what features you used for recommendation. And a brief explanation of flow of your code. For example,  what RDD does what or, why it was created.
3. datasets folder with the csv files you are using in your notebook.
4. Notebook should use relative path to the csv files in datasets folder.
5. Name of the zip folder - `<your_name>_<your_partner_name>.zip`

This project consists of 4 questions:  

1. Create an RDD with _kaggle_visible_evaluation_triplets.txt_ and replace the song name with the song index from _kaggle_songs.txt_. Identify the number of songs that do not have any rating. 
2. Generate song ratings based on the song play count as a normalized score between 0 and 1. 
3. Identify the popular song based on this rating and recommend songs to user, given user id based on the algorithm used in Movie recommender system from class. 
4. Using Cosine similarity function, identify pair-wise similarity between each pair of users and generate the top 5 most similar users without an overlap in users. 

The above list is the high-level idea about the questions. 

In [203]:
### Starter code ####
import findspark
findspark.init('/users/domitillechambon/spark-3.3.0-bin-hadoop3/')   #'/users/domitillechambon/spark-3.3.0-bin-hadoop3/'
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("Songs")
sc = SparkContext.getOrCreate(conf = conf)
#### These lines are to tell jupyter where to find Apache Spark ####

In [None]:
#sc.stop()

In [204]:
## Read triplet file into RDD
triplet_rdd = sc.textFile(r"kaggle_visible_evaluation_triplets.txt") \
    .map(lambda line: line.split("\t")) 

## Step 1: 
Replace song name with song index and identify the number of songs without user history

In [205]:
# Reading in songs file into RDD
song_rdd = sc.textFile(r"kaggle_songs.txt") \
    .map(lambda line: line.split(" ")) 

In [206]:
## SWITCHING SONG NAME FOR ITS INDEX
# Reformatting the triplet and song RDDs
tempTriplet = triplet_rdd.map(lambda x: (x[1], [x[0], x[2]]))
tempSong = song_rdd.map(lambda x: (x[0], x[1]))

# Merging the reformatted RDDs
tempMerged = tempTriplet.join(tempSong)

# Reformatting the merge
updatedTriplet = tempMerged.map(lambda x: (x[1][0][0], x[1][1], x[1][0][1]))

In [208]:
# FINDING SONGS THAT HAVE NO RATINGS
# Creating distinct list of songs users have interacted with and list of all songs
userSongs = updatedTriplet.map(lambda x: x[1]).distinct()
allSongs = song_rdd.map(lambda x: x[1])

# Calculating number of songs with no ratings
songsNoRatings = allSongs.subtract(userSongs).count()
print(songsNoRatings, "songs don't have a rating.")

[Stage 598:>                                                        (0 + 1) / 5]

223007 songs don't have a rating.


[Stage 598:>                                                        (0 + 1) / 5]

## Step 2:
Generate song ratings based on the play_count. For example, if (song_1, 5; song_2, 10; song_3, 5) i.e., song_1 is played 5 times, song_2 is played 10 times and song_3 is played 5 times, the normalized rating score should be 0.25, 0.5 and 0.25 respectively. 
Similarly, generate the rating for all the songs. You may notice that based on all songs, the rating is almost always very low. So, think of the best way to convert song count to ratings. (Hint: Try generating ratings based on each user's song play history)

In [209]:
# Reading in users file into RDD
user_rdd = sc.textFile(r"kaggle_songs.txt") \
    .map(lambda line: line.split(" ")) 

# Isolating users into list
allUsers = user_rdd.map(lambda x: x[0])

In [210]:
## MAKE UPDATED TRIPLET INTO A DICTIONARY WITH LIST OF LISTS
# Formatting UpdatedTriplet to be like a dictionary
utDict = updatedTriplet.map(lambda x: (x[0], [x[1], x[2]]))

# Grouped by users
groupedTriplet = utDict.groupByKey().map(lambda x: (x[0], list(x[1])))

In [211]:
## CALCULATE RATINGS
# Function to sum song plays for each user
def sumPlayCount(line):
    userPlays = line[0], list(map(lambda x: int(x[1]), line[1]))
    tempSum = sum(userPlays[1])
    sumRDD = line[0], list(map(lambda x: [x[0], int(x[1]), tempSum], line[1]))
    return sumRDD

# Function to calculate rating for each song for each user
def rating(line):
    userPlaySum = line[0], list(map(lambda x: [x[0], round(x[1] / x[2], 4)], line[1]))
    return userPlaySum

tripletSum = groupedTriplet.map(lambda x : sumPlayCount(x))
tripletRating = tripletSum.map(lambda x : rating(x))

## Step 3: 
For a given user_id (choose one by yourselves), rating, recommend 5 other songs from the list. One way to do this is based on another user who liked the same song liked by this user with rating more than the given rating and recommend the 5 songs based on the matched user's rating. 

In [213]:
## CHOOSE USER AND FIND AVERAGE RATING OF SONGS THEY LISTEN TO
# Calculates average rating of songs for specified user
def avgRating(user):
    narrowRatings = list(map(lambda x: x[1], user[1]))
    averageRating = sum(narrowRatings) / len(narrowRatings)
    avgRDD = user[0], list(map(lambda x: round(averageRating, 4), user[1]))
    return avgRDD

# Calculates max rating in a specified user's song list
def maxRating(user):
    narrowRatings = list(map(lambda x: x[1], user[1]))
    maxRating = max(narrowRatings)
    maxRDD = user[0], list(filter(lambda x: x[1] == maxRating, user[1]))
    return maxRDD

# User we selected
selectedUser = "e4332e11f4df6dd26673bb6b085e9a2bbdc9b8a5"

# Rating user wants song to be above to listen to (average rating of songs they've listened to
userRating = tripletRating.filter(lambda x: x[0] == selectedUser) \
                        .map(lambda x: avgRating(x)) \
                        .map(lambda x: x[1][0]).collect()  

# User's most listened to song/song with highest rating
topSong = tripletRating.filter(lambda x: x[0] == selectedUser) \
                        .map(lambda x: maxRating(x)) \
                        .map(lambda x: x[1][0][0]).collect()

[Stage 598:>                                                        (0 + 1) / 5]

In [224]:
## CREATE RECOMMENDATIONS FOR USER
# Pulls all users who have listened to our select user's top song and the rating is ≥ user's preferred rating
usersWithTopSong = tripletRating.filter(lambda x: x[0] != selectedUser) \
                                .filter(lambda x: x[1][0][0] == topSong[0]) \
                                .filter(lambda x: x[1][0][1] >= userRating[0]) \

# Pull a list of all potential songs and ratings without user into list
tempListPotentialSongs = usersWithTopSong.map(lambda x: list(map(lambda y: [y[0], y[1]], x[1]))) \
                                        .flatMap(lambda x: x)

# Remove the user's top song from list and removes songs less than rating threshold
listPotentialSongs = tempListPotentialSongs.filter(lambda x: x[0] != topSong[0]) \
                                            .filter(lambda x: x[1] >= userRating[0])

# Create distinct list of song names with a list of ratings for that song if there are duplicates
temp = listPotentialSongs.groupByKey().map(lambda x: [x[0], list(x[1])])

# Average ratings of songs with more than 1
distinctSongsWithRating = temp.map(lambda x: [x[0], round(sum(x[1]) / len(x[1]), 4)])

# Remove songs with ratings less than the rating threshold
songsAboveRating = distinctSongsWithRating.filter(lambda x: x[1] >= userRating[0]).collect()

# Create list of songs the user has already listened to
userSongs = tripletRating.filter(lambda x: x[0] == selectedUser) \
                            .map(lambda x: list(map(lambda y: y[0], x[1]))).collect()

# Remove songs the user has already listened to from the potential recommendation list
for line in songsAboveRating:
    for song in userSongs:
        if line[0] == song:
            songsAboveRating.remove(line)

# Sort the final recommendations list in order of highest song rating to lowest            
sortedRecs = sorted(songsAboveRating, key= lambda rating: rating[1], reverse= True)
 
# Create empty recommendations list
recommendations = []    

# Add the top 5 songs from the sortedRecs list
for songs in sortedRecs[0:5]:
    recommendations.append(songs[0])

# Print the final recommendation
print("The recommendations for", selectedUser, "are:", recommendations)

22/09/12 18:45:16 ERROR RetryingBlockTransferor: Exception while beginning fetch of 1 outstanding blocks (after 1 retries)
java.io.IOException: Connecting to wireless-10-147-57-150.public.utexas.edu/10.147.57.150:58251 failed in the last 4750 ms, fail this connection directly
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:214)
	at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:126)
	at org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:154)
	at org.apache.spark.network.shuffle.RetryingBlockTransferor.lambda$initiateRetry$0(RetryingBlockTransferor.java:184)
	at org.apache.spark.network.shuffle.RetryingBlockTransferor$$Lambda$2200/891737606.run(Unknown Source)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concur

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/users/domitillechambon/spark-3.3.0-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/users/domitillechambon/spark-3.3.0-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/domitillechambon/opt/anaconda3/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

## Step 4: 
1. Compute cosine similarity between all pairs of users. 
2. Sort the similarity score and print the top-5 similar users. 
3. If the top-5 user set has an user appearing more than once, ignore that pair and take the next best pair from the sorted list. 
4. For a given user_id, identify the top-5 similar users and hence song recommendations from other user's list. 

In [223]:
## COMPUTING COSINE SIMILARITY BETWEEN ALL PAIRS OF USERS
userSongs_rdd = groupedTriplet.map(lambda x: [x[0], list(map(lambda y: y[0], x[1]))]).collect()

uniqueSongs = song_rdd.map(lambda x: x[1])
uniqueSongs.take(5)

ERROR:root:KeyboardInterrupt while sending command.][Stage 655:>  (0 + 5) / 5]
Traceback (most recent call last):
  File "/users/domitillechambon/spark-3.3.0-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/users/domitillechambon/spark-3.3.0-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/domitillechambon/opt/anaconda3/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 