### Project 4: Build a recommendation algorithm based on user listening data from Autoscrobbler

Based on code from here: https://github.com/amironoff/AdvancedSparkAnalytics/blob/master/AdvancedAnalytics.Ch3.py

In [1]:
# import modules
import os

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.mllib import recommendation
from pyspark.mllib.recommendation import *

In [2]:
# set configurations
conf = SparkConf().setMaster("local").setAppName("autoscrobbler")

In [3]:
# set context
sc = SparkContext.getOrCreate(conf=conf)

In [4]:
# pathing and params
dir_data = '/data/'

user_artist_data_file = 'user_artist_data.txt'
artist_data_file = 'artist_data.txt'
artist_alias_data_file  = 'artist_alias.txt'

user_artist_full_path = os.path.join('data/',user_artist_data_file)
artist_full_path = os.path.join('data/',artist_data_file)
artist_alias_full_path = os.path.join('data/',artist_alias_data_file)

numPartitions = 2
topk = 10

In [5]:
# read user_artist_data_file into RDD (417MB file, 24MM records of users’ plays of artists, along with count)
# specifically, each row holds: userID, artistID, count
rawDataRDD = sc.textFile(user_artist_full_path, numPartitions)
rawDataRDD.cache()
rawDataRDD.take(3)

[u'1000002 1 55', u'1000002 1000006 33', u'1000002 1000007 8']

In [6]:
# read artist_data_file
rawArtistRDD = sc.textFile(artist_full_path)

In [7]:
# read artist_alias_data_file
rawAliasRDD = sc.textFile(artist_alias_full_path)

In [8]:
print(rawDataRDD.take(10))

[u'1000002 1 55', u'1000002 1000006 33', u'1000002 1000007 8', u'1000002 1000009 144', u'1000002 1000010 314', u'1000002 1000013 8', u'1000002 1000014 42', u'1000002 1000017 69', u'1000002 1000024 329', u'1000002 1000025 1']


In [9]:
print(rawArtistRDD.take(10))

[u'1134999\t06Crazy Life', u'6821360\tPang Nakarin', u'10113088\tTerfel, Bartoli- Mozart: Don', u'10151459\tThe Flaming Sidebur', u'6826647\tBodenstandig 3000', u'10186265\tJota Quest e Ivete Sangalo', u'6828986\tToto_XX (1977', u'10236364\tU.S Bombs -', u'1135000\tartist formaly know as Mat', u'10299728\tKassierer - Musik f\xfcr beide Ohren']


In [10]:
def parseArtistIdNamePair(singlePair):
   splitPair = singlePair.rsplit('\t')
   # we should have two items in the list - id and name of the artist.
   if len(splitPair) != 2:
       #print singlePair
       return []
   else:
       try:
           return [(int(splitPair[0]), splitPair[1])]
       except:
           return []


In [11]:
artistByID = dict(rawArtistRDD.flatMap(lambda x: parseArtistIdNamePair(x)).collect())

In [12]:
artist_vals = artistByID.values()

In [13]:
# print topk values from artistById
artist_vals[:topk]


[u'DJ Tiesto -  P.O.S',
 u'Air',
 u'Severed Heads',
 u'Marianne Faithfull',
 u'Peace Orchestra',
 u'Gallon Drunk',
 u'DJ Tiesto - Solid Globe',
 u'Omni Trio',
 u'The Last Poets',
 u'Rhythm & Sound']

In [14]:
def parseArtistAlias(alias):
    splitPair = alias.rsplit('\t')
    # we should have two items in the list - id and name of the artist.
    if len(splitPair) != 2:
        #print singlePair
        return []
    else:
        try:
            return [(int(splitPair[0]), int(splitPair[1]))]
        except:
            return []

In [15]:
artistAlias = rawAliasRDD.flatMap(lambda x: parseArtistAlias(x)).collectAsMap()

In [16]:
# turn the artistAlias into a broadcast variable.
# This will distribute it to worker nodes efficiently, so we save bandwidth.
artistAliasBroadcast = sc.broadcast(artistAlias)

In [17]:
artistAliasBroadcast.value.get(2097174)

1007797

In [18]:
# Print the number of records from the largest RDD, rawDataRDD
rawDataRDD.count()

24296858

In [19]:
# Sample 10% of rawDataRDD using seed 314, to reduce runtime. Call it sample.
weights = [.1, .9]
seed = 314
sample, someOtherJunk = rawDataRDD.randomSplit(weights, seed)
sample.cache()

PythonRDD[12] at RDD at PythonRDD.scala:43

In [20]:
# take the first 5 records from the sample. each row represents userID, artistID, count.
sample.take(5)

[u'1000002 1000007 8',
 u'1000002 1000055 25',
 u'1000002 1000107 4',
 u'1000002 1000113 30',
 u'1000002 1000157 2']

In [21]:
# Based on sampled data, build the matrix for model training
def mapSingleObservation(x):
    # Returns Rating object represented as (user, product, rating) tuple.
    userID, artistID, count = map(lambda lineItem: int(lineItem), x.split())
    # [add line of code here to split each record into userID, artistID, count]
    # given possible aliasing, get finalArtistID
    finalArtistID = artistAliasBroadcast.value.get(artistID)
    if finalArtistID is None:
        finalArtistID = artistID
    return Rating(userID, finalArtistID, count)

In [22]:
trainData = sample.map(lambda x: mapSingleObservation(x))
trainData.cache()

PythonRDD[14] at RDD at PythonRDD.scala:43

In [23]:
# Take the first 5 records from trainData
trainData.take(5)

[Rating(user=1000002, product=1000007, rating=8.0),
 Rating(user=1000002, product=1000055, rating=25.0),
 Rating(user=1000002, product=1000107, rating=4.0),
 Rating(user=1000002, product=1000113, rating=30.0),
 Rating(user=1000002, product=1000157, rating=2.0)]

In [28]:
# filter trainData, collecting all records where rating = 8.0
trainData.filter(lambda x: x[2] == 8.0).collect()

[Rating(user=1000002, product=1000007, rating=8.0),
 Rating(user=1000002, product=1001646, rating=8.0),
 Rating(user=1000002, product=1187, rating=8.0),
 Rating(user=1000019, product=1001236, rating=8.0),
 Rating(user=1000022, product=1000038, rating=8.0),
 Rating(user=1000022, product=1000199, rating=8.0),
 Rating(user=1000022, product=1000631, rating=8.0),
 Rating(user=1000028, product=1003465, rating=8.0),
 Rating(user=1000029, product=1000577, rating=8.0),
 Rating(user=1000035, product=1001174, rating=8.0),
 Rating(user=1000035, product=1259, rating=8.0),
 Rating(user=1000035, product=1478, rating=8.0),
 Rating(user=1000039, product=1003035, rating=8.0),
 Rating(user=1000041, product=1295531, rating=8.0),
 Rating(user=1000047, product=9918732, rating=8.0),
 Rating(user=1000056, product=1001278, rating=8.0),
 Rating(user=1000058, product=1003258, rating=8.0),
 Rating(user=1000060, product=1003827, rating=8.0),
 Rating(user=1000067, product=1284202, rating=8.0),
 Rating(user=1000088,

**The Alternating Least Squares Algo is popular for recommendation**

Parameters

`rank`  
The number of latent factors in the model

`iterations`  
The number of iterations that the factorization runs

`lambda`  
A standard overfitting parameter. Higher values resist overfitting.


In [30]:
# Train the ALS model, using seed 314, rank 10, iterations 5, lambda_ 0.01
model = ALS.trainImplicit(trainData, 10, 5, 0.01, seed=314)

In [31]:
model

<pyspark.mllib.recommendation.MatrixFactorizationModel at 0x7f83121bb750>

In [32]:
# Model Evaluation

# fetch artists for a test user
testUserID = 1000002

# broadcast artistByID for speed
# [code here]
artistByIDBroadcast = sc.broadcast(artistByID)


# from trainData, collect the artists for the test user. Call the object artistsForUser.
# hint: you will need to apply .value.get(x.product) to the broadcast artistByID, where x is the Rating RDD.
# if you don't do this, you may see artistIDs. you want artist names.

# [code here]
artistsForUser = (trainData
                  .filter(lambda observation: observation.user == testUserID)
                  .map(lambda observation: artistByIDBroadcast.value.get(observation.product))
                  .collect())

In [33]:
print(artistsForUser)

[u'The Phil Collins Big Band', u'Cream', u'Iron Maiden', u'The Beatles', u'AC/DC', u'Silverchair', u'Warren Bernhardt', u'Helmet', u'The Lightning Seeds', u'Cutting Crew', u'Puddle of Mudd', u'Queens of the Stone Age', u'Soulwax', u'Harvey Danger', u'Duran Duran Feat. Melle Mel &', u'Hootie & the Blowfish', u'18 Wheeler', u'Sleeper', u'Clam Abuse', u'Eiffel 65', u'Eric Burdon & The Animals', u'Carl Douglas', u'Crazy Town', u'Mallrats', u'The Smashing Pumpkins', u'Innerzone Orchestra', u'The Waterboys', u'Procol Harum', u'The Classic Chill Out Album', u'Nat King Cole', u'NoArtist', u'The Goops', u'Rainbow', u'Bentley Ryhthm Ace', u'ZZ Top', u'Bryan Savage', u'Boney James', u'Ken Navarro', u'Keiko Matsui', u'Tuck And Patti', u'Mindi Abair', u'BWB', u'Erroll Garner', u'Jesse Cook', u'Jeff Kashiwa', u'Richard Ashcroft', u'Mr. Big', u'Lakki Patey', u'GRP All-Star Big Band', u'Alex Bugnon', u'Grover Washington Jr.', u'Shakatak', u'Jazzmasters', u'Bona Fide', u'George Howard', u'Mother Love B

In [34]:
# Make 10 recommendations for test user
num_recomm = 10
recommendationsForUser = map(lambda observation: artistByID.get(observation.product), model.call("recommendProducts", testUserID, num_recomm))
print(recommendationsForUser)

[u'Nirvana', u'Metallica', u'The Beatles', u'Nine Inch Nails', u'Pink Floyd', u'Queen', u'Tool', u'Led Zeppelin', u'Red Hot Chili Peppers', u'R.E.M.']


In [35]:
# Train a second ALS model, same as first but with rank 20
model2 = ALS.trainImplicit(trainData, 20, 5, 0.01, seed=314)

In [36]:
# Using the rank 20 model, make 10 recommendations for the same test user
recommendationsForUser2 = map(lambda observation: artistByID.get(observation.product), model2.call("recommendProducts", testUserID, num_recomm))
print(recommendationsForUser2)

[u'The Beatles', u'Red Hot Chili Peppers', u'Nine Inch Nails', u'Iron Maiden', u'Queen', u'[unknown]', u'The Smashing Pumpkins', u'The White Stripes', u'Foo Fighters', u'blink-182']
