In [1]:
import os
import pyspark

In [2]:
from pyspark import SparkContext
sc = SparkContext()

### Load data into Spark RDD

In [3]:
# Load user's taste data
taste_file = os.path.join('.','data','subset_taste_profile.csv')
taste_raw_data = sc.textFile(taste_file)
taste_raw_data_header = taste_raw_data.take(1)[0]
taste_data = taste_raw_data.filter(lambda line: line!=taste_raw_data_header)\
            .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),int(tokens[2]))).cache()

In [4]:
# Load song file
song_file = os.path.join('.','data','song_encode_meta.csv')
song_raw_data = sc.textFile(song_file)
song_raw_data_header = song_raw_data.take(1)[0]
song_data = song_raw_data.filter(lambda line: line!=song_raw_data_header)\
           .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

In [5]:
taste_data.count(), song_data.count()

(1000001, 385256)

In [6]:
taste_data.take(5)

[(0, 0, 1), (0, 1, 1), (0, 2, 2), (0, 3, 1), (0, 4, 1)]

In [7]:
song_data.take(5)

[(0, u'The Cove', u'Jack Johnson'),
 (1, u'Nothing from Nothing', u'Billy Preston'),
 (2, u'Entre Dos Aguas', u'Paco De Lucia'),
 (3, u'Under Cold Blue Stars', u'Josh Rouse'),
 (4, u'Riot Radio (Soundtrack Version)', u'The Dead 60s')]

### Split Training/Test dataset

We will randomly split taste_data RDD into two portion (60% + 40%, in this example) and mask playcount the second portion RDD with zero, and then merge it back with the first portion. Hence, we will get training data set with randomly masked playcount. This way we use this processed training dataset to train the model and evaluate the model with test dataset (which in fact the original taste data)

In [9]:
# Count unique songs + Count unique users
songID_with_playcount_RDD = taste_data.map(lambda x: (x[1], x[2])).groupByKey()
userID_with_playcount_RDD = taste_data.map(lambda x: (x[0], x[2])).groupByKey()

In [10]:
songID_with_playcount_RDD.count(), userID_with_playcount_RDD.count()

(148039, 20787)

In [21]:
training_RDD, validation_RDD = taste_data.randomSplit([6, 4], seed=0L)

In [22]:
train_songID_with_playcount_RDD = training_RDD.map(lambda x: (x[1], x[2])).groupByKey()
train_userID_with_playcount_RDD = training_RDD.map(lambda x: (x[0], x[2])).groupByKey()

In [23]:
train_songID_with_playcount_RDD.count(), train_userID_with_playcount_RDD.count()

(117974, 20787)

In [27]:
taste_data.count(), training_RDD.count(), validation_RDD.count()

(1000001, 600391, 399610)

#### Mask validation_RDD

In [29]:
masked_validation_RDD = validation_RDD.map(lambda x: (x[0], x[1], 0))

In [30]:
masked_validation_RDD.count()

399610

In [31]:
masked_validation_RDD.take(10)

[(0, 0, 0),
 (0, 3, 0),
 (0, 4, 0),
 (0, 6, 0),
 (0, 7, 0),
 (0, 8, 0),
 (0, 9, 0),
 (0, 11, 0),
 (0, 12, 0),
 (0, 13, 0)]

In [32]:
validation_RDD.take(10)

[(0, 0, 1),
 (0, 3, 1),
 (0, 4, 1),
 (0, 6, 2),
 (0, 7, 1),
 (0, 8, 1),
 (0, 9, 1),
 (0, 11, 1),
 (0, 12, 1),
 (0, 13, 5)]

#### Merge masked_validation_RDD with training_RDD to get masked_training_RDD ready for model training

In [34]:
masked_training_RDD = training_RDD.union(masked_validation_RDD)

In [35]:
masked_training_RDD.count()

1000001

In [36]:
masked_train_songID_with_playcount_RDD = masked_training_RDD.map(lambda x: (x[1], x[2])).groupByKey()
masked_train_userID_with_playcount_RDD = masked_training_RDD.map(lambda x: (x[0], x[2])).groupByKey()

In [37]:
masked_train_songID_with_playcount_RDD.count(), masked_train_userID_with_playcount_RDD.count()

(148039, 20787)

In [39]:
# Now we have masked_training_RDD and test_RDD ready for model evaludation
test_RDD = taste_data

### Model Evaluation

In [None]:
'''
1. Train the ALS model with masked training data (masked_training_RDD)
2. Loop for each user 
   2.1 Get list of songs from masked_training_RDD where playcount = 0
   2.2 Predict those songs 
   2.3 Get the actual playcounts of those songs from test_RDD 
   2.3 Calculate score:
       MAP - Get top n recommended songs and compare with actual set of songs
       AUC - prediction score (confidence level) vs actual data (binary)
3. Calculate average score
'''

In [42]:
# Get unrated (unplayed) songs
def get_unplayed_songs_for_users(data, user_id):
# Get pairs of (userID, movieID) for user_id unrated movies
    user_unplayed_songs_RDD = data.filter(lambda rating: not rating[0] == user_id)\
                                          .map(lambda x: (user_id, x[1])).distinct()
    return user_unplayed_songs_RDD

In [54]:
m_user_unplay_RDD = get_unplayed_songs_for_users(masked_training_RDD, 12)
user_unplay_RDD = get_unplayed_songs_for_users(taste_data, 12)

In [55]:
user_unplay_RDD.count(), m_user_unplay_RDD.count()

(148037, 148037)

In [56]:
user_unplay_RDD.take(5)

[(12, 57698), (12, 9460), (12, 147210), (12, 30798), (12, 113440)]

In [None]:
user_unplay