In [1]:
import os, sys

In [2]:
from random import randrange
from operator import itemgetter
from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS, Rating
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
from pyspark.sql.types import Row
from pyspark.sql import functions as F

In [3]:
conf = SparkConf()
conf.set("spark.app.name", "Recommender");
conf.set("spark.master", "spark://spark-master:7077");
conf.set("spark.executor.memory", "6g")
conf.set("spark.ui.port", "8080");
sc = SparkContext(conf=conf).getOrCreate()

In [4]:
rawArtistData = sc.textFile("resources/lastfm/artist_data.txt")
rawArtistAlias = sc.textFile("resources/lastfm/artist_alias.txt")
rawUserArtistData = sc.textFile("resources/lastfm/user_artist_data.txt")

In [5]:
def buildRatings(rawUserArtistData, bArtistAlias):
    def getArtistRating(line):
        (userID, artistID, count) = map(lambda x: int(x), line.split(' '))
        try:
            finalArtistID = bArtistAlias.value[artistID]
        except KeyError:
            finalArtistID = artistID
        return Rating(userID, finalArtistID, count)

    return rawUserArtistData.map(lambda line: getArtistRating(line))

In [6]:
def representsInt(s):
    try:
        int(s)
        return True
    except ValueError:
        return False

In [7]:
def buildArtistAlias(rawArtistAlias):
    '''
        - convert ther rawArtistData into tuples of (aliasID, artistID)
        - filter all bad lines
    '''
    return rawArtistAlias \
        .map(lambda line: line.split('\t')) \
        .filter(lambda artist: artist[0] and representsInt(artist[0])) \
        .map(lambda artist: (int(artist[0]), int(artist[1]))) \
        .collectAsMap()

In [8]:
def buildArtistByID(rawArtistData):
    '''
        - convert ther rawArtistData into tuples of (artistID, artistName)
        - filter all bad lines
    '''
    return rawArtistData \
        .map(lambda line: line.split('\t', 1)) \
        .filter(lambda artist: artist[0] and representsInt(artist[0])) \
        .map(lambda artist: (int(artist[0]), artist[1].strip()))

In [9]:
sqlContext = SQLContext(sc)

In [10]:
artistByID = buildArtistByID(rawArtistData)
artistAlias = buildArtistAlias(rawArtistAlias)

In [None]:
artistAlias

In [11]:
list(artistAlias.items())[0:10]

[(1092764, 1000311),
 (1095122, 1000557),
 (6708070, 1007267),
 (10088054, 1042317),
 (1195917, 1042317),
 (1112006, 1000557),
 (1187350, 1294511),
 (1116694, 1327092),
 (6793225, 1042317),
 (1079959, 1000557)]

In [12]:
artistByID.take(10)

[(1134999, '06Crazy Life'),
 (6821360, 'Pang Nakarin'),
 (10113088, 'Terfel, Bartoli- Mozart: Don'),
 (10151459, 'The Flaming Sidebur'),
 (6826647, 'Bodenstandig 3000'),
 (10186265, 'Jota Quest e Ivete Sangalo'),
 (6828986, 'Toto_XX (1977'),
 (10236364, 'U.S Bombs -'),
 (1135000, 'artist formaly know as Mat'),
 (10299728, 'Kassierer - Musik für beide Ohren')]

In [13]:
bArtistAlias = sc.broadcast(artistAlias)
trainData = buildRatings(rawUserArtistData, bArtistAlias).cache()

In [14]:
trainData

PythonRDD[8] at RDD at PythonRDD.scala:53

In [16]:
trainData.cache()

PythonRDD[8] at RDD at PythonRDD.scala:53

In [15]:
trainData.first()

Rating(user=1000002, product=1, rating=55.0)

In [18]:
model = ALS.trainImplicit(ratings=trainData, rank=10, iterations=5, lambda_=0.01, alpha=1.0)

In [19]:
trainData.unpersist()

PythonRDD[8] at RDD at PythonRDD.scala:53

In [20]:
print(model.userFeatures().mapValues(lambda v: ", ".join( map(lambda x: str(x),v) )).first())

(90, '-0.314458429813385, 0.37867167592048645, 0.6905015707015991, -0.47901904582977295, 1.0030543804168701, 0.7148472666740417, 0.05969858169555664, 0.18951988220214844, 0.28648626804351807, -0.21078184247016907')


In [51]:
userID = 2093760
recommendations = model.recommendProducts(userID, 5)
for val in recommendations:
    print(val, val.product)

Rating(user=2093760, product=2814, rating=0.025538512837930714) 2814
Rating(user=2093760, product=1300642, rating=0.025242757394979408) 1300642
Rating(user=2093760, product=1001819, rating=0.02488927245684423) 1001819
Rating(user=2093760, product=1003249, rating=0.024226043130549567) 1003249
Rating(user=2093760, product=4605, rating=0.024202361443191288) 4605


In [52]:
recommendedProductIDs = list(map(lambda rec: rec.product, recommendations))

In [53]:
rawArtistForUser = rawUserArtistData\
    .map(lambda x: x.split(' '))\
    .filter(lambda x: int(x[0]) == userID)

In [54]:
existingProducts = rawArtistForUser.map(lambda x: int(x[1])).collect()
for val in existingProducts:
    print(val)

1180
1255340
378
813
942


In [55]:
existingArtists = artistByID.filter(lambda artist: artist[0] in existingProducts).collect()
for val in existingArtists:
    print(val)

(1180, 'David Gray')
(378, 'Blackalicious')
(813, 'Jurassic 5')
(1255340, 'The Saw Doctors')
(942, 'Xzibit')


In [56]:
recommendedArtists = artistByID.filter(lambda artist: artist[0] in recommendedProductIDs).collect()
for val in recommendedArtists:
    print(val)

(2814, '50 Cent')
(4605, 'Snoop Dogg')
(1003249, 'Ludacris')
(1001819, '2Pac')
(1300642, 'The Game')
