##### In this project I work with a recommendation algorithm based on user listening data from Autoscrobbler using Alternating Least Squares (ALS)

In [4]:
# import modules
import os

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.mllib import recommendation
from pyspark.mllib.recommendation import *
import pandas as pd

In [5]:
# configurations
conf = SparkConf().setMaster("local").setAppName("autoscrobbler")

In [6]:
# context
sc = SparkContext.getOrCreate(conf=conf)

In [7]:
# pathing and params
user_artist_data_file = 'user_artist_data.txt'
artist_data_file = 'artist_data.txt'
artist_alias_data_file  = 'artist_alias.txt'

numPartitions = 2
topk = 10

In [8]:
# reading user_artist_data_file into RDD (417MB file, 24MM records of users’ plays of artists, along with count)
# specifically, each row holds: userID, artistID, count
rawDataRDD = sc.textFile(user_artist_data_file, numPartitions)
rawDataRDD.cache()

user_artist_data.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [9]:
# inspect some records
rawDataRDD.take(2)

['1000002 1 55', '1000002 1000006 33']

In [10]:
# read artist_data_file using *textFile*
rawArtistRDD = sc.textFile(artist_data_file, numPartitions)

In [11]:
# inspect some records
rawArtistRDD.take(5)

['1134999\t06Crazy Life',
 '6821360\tPang Nakarin',
 '10113088\tTerfel, Bartoli- Mozart: Don',
 '10151459\tThe Flaming Sidebur',
 '6826647\tBodenstandig 3000']

In [12]:
# read artist_alias_data_file using *textFile*
rawAliasRDD = sc.textFile(artist_alias_data_file, numPartitions)

In [13]:
# inspect some records
rawAliasRDD.take(5)

['1092764\t1000311',
 '1095122\t1000557',
 '6708070\t1007267',
 '10088054\t1042317',
 '1195917\t1042317']

In [14]:
# first 10 records from rawDataRDD
rawDataRDD.take(10)

['1000002 1 55',
 '1000002 1000006 33',
 '1000002 1000007 8',
 '1000002 1000009 144',
 '1000002 1000010 314',
 '1000002 1000013 8',
 '1000002 1000014 42',
 '1000002 1000017 69',
 '1000002 1000024 329',
 '1000002 1000025 1']

In [16]:
def parseArtistIdNamePair(singlePair):
   splitPair = singlePair.rsplit('\t')
   # we should have two items in the list - id and name of the artist.
   if len(splitPair) != 2:
       #print singlePair
       return []
   else:
       try:
           return [(int(splitPair[0]), splitPair[1])]
       except:
           return []


In [17]:
# as list
artistByID = list(rawArtistRDD \
               .flatMap(lambda x: parseArtistIdNamePair(x))\
               .collectAsMap() \
               .values())
artistByID[:10]

['06Crazy Life',
 'Pang Nakarin',
 'Terfel, Bartoli- Mozart: Don',
 'The Flaming Sidebur',
 'Bodenstandig 3000',
 'Jota Quest e Ivete Sangalo',
 'Toto_XX (1977',
 'U.S Bombs -',
 'artist formaly know as Mat',
 'Kassierer - Musik für beide Ohren']

In [18]:
# as dict
artistByID = dict(rawArtistRDD.flatMap(lambda x: parseArtistIdNamePair(x)).collect())
list(artistByID.values())[:10]

['06Crazy Life',
 'Pang Nakarin',
 'Terfel, Bartoli- Mozart: Don',
 'The Flaming Sidebur',
 'Bodenstandig 3000',
 'Jota Quest e Ivete Sangalo',
 'Toto_XX (1977',
 'U.S Bombs -',
 'artist formaly know as Mat',
 'Kassierer - Musik für beide Ohren']

---

In [19]:
def parseArtistAlias(alias):
    splitPair = alias.rsplit('\t')
    # we should have two ids in the list.
    if len(splitPair) != 2:
        #print singlePair
        return []
    else:
        try:
            return [(int(splitPair[0]), int(splitPair[1]))]
        except:
            return []

In [20]:
artistAlias = rawAliasRDD.flatMap(lambda x: parseArtistAlias(x)).collectAsMap()

In [21]:
# turning the artistAlias into a broadcast variable.
# to distribute it to worker nodes efficiently, so we save bandwidth.
artistAliasBroadcast = sc.broadcast( artistAlias )

In [22]:
artistAliasBroadcast.value.get(2097174)

1007797

In [23]:
print( rawDataRDD.count() )

24296858


In [24]:
# Sampling 10% of rawDataRDD (to reduce runtime) 
seed = 314
weights = [.1,.9]
sample, _ = rawDataRDD.randomSplit(weights, seed)
sample.cache()

PythonRDD[14] at RDD at PythonRDD.scala:53

In [25]:
sample.take(5)

['1000002 1000014 42',
 '1000002 1000088 157',
 '1000002 1000139 56',
 '1000002 1000140 95',
 '1000002 1000210 23']

In [26]:
# matrix for model training
def mapSingleObservation(x):
    # Returns Rating object represented as (user, product, rating) tuple.
    
    # splits each record into userID, artistID, count]
    userID, artistID, count = map(lambda x: int(x), x.split())
    
    # given possible aliasing, gets finalArtistID
    finalArtistID = artistAliasBroadcast.value.get(artistID)
    if finalArtistID is None:
        finalArtistID = artistID
    return Rating(userID, finalArtistID, count)

In [27]:
trainData = sample.map(lambda x: mapSingleObservation(x))
trainData.cache()

PythonRDD[16] at RDD at PythonRDD.scala:53

In [28]:
trainData.take(5)

[Rating(user=1000002, product=1000014, rating=42.0),
 Rating(user=1000002, product=1000088, rating=157.0),
 Rating(user=1000002, product=1000139, rating=56.0),
 Rating(user=1000002, product=1000140, rating=95.0),
 Rating(user=1000002, product=1000210, rating=23.0)]

In [29]:
# training the ALS implicit model (since the measurements are activity and not ratings)

seed = 314
rank = 10
iterations = 5
alpha = 0.01

ALS_model = ALS.trainImplicit(trainData, 
                              rank=rank, 
                              iterations=iterations, 
                              alpha=alpha)

In [30]:
type(artistByID)

dict

In [31]:
type(sc.broadcast(artistByID))

pyspark.broadcast.Broadcast

In [32]:
# Model Evaluation

# fetch artists for a test user
testUserID = 1000002

# broadcast artistByID for speed
artistByIDBroadcast = sc.broadcast( artistByID )

# collect artists from train data for test user
artistsForUser = (trainData
                  .filter(lambda observation: observation.user == testUserID) \
                  .map(lambda observation: artistByIDBroadcast.value.get(observation.product)) \
                  .collect())

In [33]:
# artist listens for testUserID = 1000002
print("Without None objects", [i for i in artistsForUser if i])  

Without None objects ['Café Del Mar', 'Eric Clapton', 'Eurythmics']


In [34]:
# 10 recommendations for testUserID = 1000002
num_recomm = 600 # this filters down to 10 after filtering Nones
recommendationsForUser = map(lambda observation: artistByID.get(observation.product), ALS_model.call("recommendProducts", testUserID, num_recomm))
print([x for x in recommendationsForUser if x is not None][:10])

['Eric Clapton', 'Enigma', 'Scorpions', '植松伸夫', 'Gary Jules', 'Eurythmics', 'Dark Tranquillity', 'Elvis Costello', 'Joss Stone', 'Nena']


In [35]:
# Train a second ALS model

seed = 311
rank2 = 20
iterations = 5
alpha = 0.01

ALS_model2 = ALS.trainImplicit(trainData, 
                              rank=rank2, 
                              iterations=iterations, 
                              alpha=alpha)

In [41]:
# 10 recommendations for the same user with the rank 20 model
recommendationsForUser_rank20 = map(lambda observation: artistByID.get(observation.product), ALS_model2.call("recommendProducts", testUserID, num_recomm))
print([x for x in recommendationsForUser_rank20 if x is not None][:10])

['Eric Clapton', 'Dark Tranquillity', 'Scorpions', 'Enigma', 'Gary Jules', 'Eurythmics', 'Elvis Costello', 'Echo & the Bunnymen', 'Joss Stone', 'Nena']


##### The two models perform almost identically. Both the rank 10 and rank 20 models included two of the artist listens for the testUserID. 