In [1]:
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS,MatrixFactorizationModel, Rating

In [2]:
# load data 
user_aritist_data = sc.textFile(r'data/user_artist_data_small.txt')
artist_data = sc.textFile(r'data/artist_data_small.txt')
artist_alias = sc.textFile(r'data/artist_alias_small.txt')

In [3]:
# check the statistics of UserID and ArtistID
print('UserID: ', user_aritist_data.map(lambda line: int(line.split(" ")[0])).stats())
print('ArtistID：', user_aritist_data.map(lambda line: int(line.split(" ")[1])).stats())
print('Count: ', user_aritist_data.map(lambda line: int(line.split(" ")[2])).stats())

#The computed statistics that are printed reveal that the maximum user and artist IDs are 2288164 and 10788218, respectively.
#These are comfortably smaller than 2147483647. No additional transformation will be necessary to use these IDs.

UserID:  (count: 49481, mean: 1328420.19494351, stdev: 452986.735676, max: 2288164.0, min: 1000647.0)
ArtistID： (count: 49481, mean: 2003155.0297285826, stdev: 2489584.20712, max: 10788218.0, min: 1.0)
Count:  (count: 49481, mean: 130.5757967704775, stdev: 3034.35409229, max: 439771.0, min: 1.0)


In [4]:
# map the artist names that correspond to the numeric ID
def convert(line):
    id, name = line.split("\t")
    if not name or not id:      # remove record when there is missing value in line
        return
    id = int(id)
    name = name.strip()
    return (id, name)
artist_data_new = artist_data.map(convert)
artist_data_new.take(10)

[(1240105, 'André Visior'),
 (1240113, 'riow arai'),
 (1240132, 'Outkast & Rage Against the Machine'),
 (6776115, '小松正夫'),
 (1030848, "Raver's Nature"),
 (6671601, 'Erguner, Kudsi'),
 (1106617, 'Bloque'),
 (1240185, 'Lexy & K. Paul'),
 (6671631, 'Rev. W.M. Mosley'),
 (6671632, 'Labelle, Patti')]

In [5]:
# map the artist alias.txt file so that you can get the misspelled or non-standard names to be mapped to the 
#artist’s canonical names
def alias(line):
    id,alias = line.split('\t')
    if not id or not alias:
        return
    return (int(id), int(alias))
artist_alias_new = artist_alias.map(alias).collectAsMap()   #create a dict so that it can be indexed to replace misspelled names
alias = sc.broadcast(artist_alias_new)

In [6]:
# Get the dataset for the Spark ALS implementation
def train(line):
    userID, artistID, count = line.split(" ")
    userID, artistID, count = int(userID), int(artistID), int(count)
    artistID = alias.value.get(artistID,artistID)   # Convert artist IDs to a canonical ID 
    return  (userID, artistID, count)
#convert the data into a Rating object
train_data = user_aritist_data.map(train).map(lambda x: Rating(x[0],x[1],x[2]))

In [7]:
train_data.first()

Rating(user=1059637, product=1000010, rating=238.0)

In [8]:
# Build an initial model for ALS using the trainImplicit function
model = ALS.trainImplicit(train_data, 10, 5, 0.01, 1)

In [9]:
model

<pyspark.mllib.recommendation.MatrixFactorizationModel at 0x5b2aac8>

In [10]:
#spot checking recommendations
def recommend(userID):
    artofuser = user_aritist_data.map(lambda line:line.split(" ")).filter(lambda x: x[0]== str(userID))
    artproducts = set(artofuser.map(lambda x: int(x[1])).collect()) # the unique artistID that the user have listened
    # Extract the IDs and names
    products = artist_data_new.filter(lambda x:x[0] in artproducts).map(lambda x:(x[0],x[1])).collect()
    print("Listened products by user: " + str(userID), products[:50]) # print 50 listened products
    
    #Get 10 the recommended products 
    rec = model.recommendProducts(userID,50) 
    recproducts = set([x[1] for x in rec])
    rec_products = artist_data_new.filter(lambda x:x[0] in recproducts).map(lambda x:(x[0], x[1])).collect()
    # print out the recommended products
    print("Recommended products for user: " + str(userID), rec_products)
    
    #check the number of recommend products that have listened by the user
    print("Number of recommend products in listened products: ", sum([x in products for x in rec_products]))

In [11]:
# userID: 1059637
recommend(1059637)

Listened products by user: 1059637 [(1002584, 'Nena'), (1247913, 'JamisonParker'), (1257062, 'The Spill Canvas'), (1257410, 'tomandandy (ft. Kip Pardue)'), (1260489, 'The Exciters'), (1260572, 'Nightmare Of You'), (1261496, 'J-Kwon'), (6992072, 'angle'), (5496, 'Echo & the Bunnymen'), (1150039, 'Letter Kills'), (1283493, 'An Angle'), (1006354, 'Pedro the Lion'), (1085052, 'Cordero'), (78, 'Sublime'), (1233389, 'The American Analog Set'), (1234850, 'The Hollies'), (1009156, 'Mae'), (1003853, 'Les Savy Fav'), (1044920, 'Matchbook Romance'), (5659, 'Midtown'), (1000428, 'Blind Melon'), (1000320, 'MxPx'), (1314259, "I Can Make a Mess Like Nobody's Business"), (1005990, 'Motion City Soundtrack'), (6790420, 'The Academy Is'), (1001233, 'Sheryl Crow'), (1003727, 'Yeah Yeah Yeahs'), (1019931, 'Usher'), (1009499, 'ebaumsworld.com'), (6957310, 'Starting LineLine'), (1006594, 'Norma Jean'), (1002225, 'Sunny Day Real Estate'), (1002289, 'Guster'), (1006411, 'Built to Spill'), (1000305, 'The Lightn

In [12]:
# userID: 2023686
recommend(2023686)

Listened products by user: 2023686 [(1240132, 'Outkast & Rage Against the Machine'), (1240262, 'The Gufs'), (6828988, 'Southern Conference Featuring Dr. Ace'), (1002584, 'Nena'), (1240603, 'The Wake'), (1241016, 'The Ducky Boys'), (1241122, 'The Glove'), (7031362, 'Piolts vs. Aeroplane'), (7031363, 'Grinning Idiots'), (7031364, 'DJ Krush Featuring Boss The Mc'), (7031365, 'Beavis & Butt-head (Mike Judge)'), (10025458, 'Emil Hyde'), (1244907, 'Lindsay Lohan'), (1245026, 'The Pimps'), (1245064, 'The Patti Smith Group'), (1137400, 'Chaise Lounge'), (1068227, 'Bliss 66'), (604, 'Ten Foot Pole'), (10027022, 'E_O_H'), (1044471, 'Eugene Chadbourne'), (1246307, 'Bash & Pop'), (10027052, 'Chasing Zero'), (6828997, 'Chuck Girard with Paul Johnson'), (1246478, 'The Choir'), (1000193, 'Saliva'), (1086845, 'Hanover Saints'), (6962264, 'Dale Thompson & The Kentucky Cadillacs'), (1247017, 'Nusrat Fateh Ali Khan & Party & Diamanda Gallas'), (1839, 'Alabama 3'), (1116416, 'Combine'), (1248390, 'The Sta

In [13]:
# userID: 1026084
recommend(1026084)

Listened products by user: 1026084 [(1030848, "Raver's Nature"), (1240510, 'The Mercury Program'), (6990766, 'Phil Hendrie - 11/06/98'), (1244952, 'The Fiery Furnaces'), (10299597, 'Egodog'), (1246719, 'The Loud Family'), (1246950, 'The Roots of Orchis'), (1246957, 'The Intima'), (1247079, 'The Plus Ones'), (1247732, 'Williamson'), (1248245, 'The Three Amigos'), (1248866, 'New Artist (483)'), (10584463, 'A/D Abe Dolinger'), (1251900, 'The Ex'), (10436601, 'Michael Tiernan'), (135, 'Motorbass'), (1004461, 'Gary Jules'), (1252607, 'bexar bexar'), (9934695, 'Eric in Frisco'), (7031948, 'James Shadko'), (1253284, 'Café Del Mar'), (1000658, 'Squeeze'), (1234278, 'The (International) Noise Conspiracy'), (1255551, 'Stan Getz & Joao Gilberto Featuring Antonio Carlos Jobim'), (1033983, 'Calendar Girl'), (1257059, 'The Bacon Brothers'), (1257729, 'Béla Bartók'), (1258010, 'Archie Bronson Outfit'), (1259199, 'Roger Clyne & the Peacemakers'), (1554, 'Cave In'), (1259410, 'The Twin Atlas'), (126054

In [14]:
#Using the Cross-validation loop to study performance of our recommender
#split data
split = [0.1]*10
train = train_data.randomSplit(split, seed = 111)
sum = 0
# split into 5 folder to do cross_validation
for i in range(len(train)):
    test = train[i]
    testData = test.map(lambda x:(x[0],x[1])).cache()
    trainData = sc.union(train[0:i]+train[i+1:])
    #parameters
    rank, iterations, lam, alpha = 10, 10, 0.01, 1
    train_model = ALS.trainImplicit(trainData, rank, iterations, lam, alpha)
    prediction = train_model.predictAll(testData).map(lambda r: ((r[0],r[1]),r[2]))  #prediction value for testing data
    # Evaluate the model on testing data
    ratesAndPreds = test.map(lambda r: ((r[0], r[1]), r[2])).join(prediction)
    MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
    print("Mean Squared Error for (%s %s %s %s) = %s" % (rank, iterations, lam, alpha, MSE))
    sum += MSE
print("The average Mean Squared Error for (%s %s %s %s) = %s" % (rank, iterations, lam, alpha, sum/10))

Mean Squared Error for (10 10 0.01 1) = 876718.4995627955
Mean Squared Error for (10 10 0.01 1) = 750506.8470843043
Mean Squared Error for (10 10 0.01 1) = 76078114.3264593
Mean Squared Error for (10 10 0.01 1) = 4397547.34793695
Mean Squared Error for (10 10 0.01 1) = 817284.3237705904
Mean Squared Error for (10 10 0.01 1) = 75140715.43301444
Mean Squared Error for (10 10 0.01 1) = 2223835.3984293407
Mean Squared Error for (10 10 0.01 1) = 2454363.652959341
Mean Squared Error for (10 10 0.01 1) = 14390667.10041674
Mean Squared Error for (10 10 0.01 1) = 581811.4237929212
The average Mean Squared Error for (10 10 0.01 1) = 17771156.435342677


In [15]:
def cross_validation(data, rank, iterations, alpha, lam):
    split = [0.1]*10
    train = data.randomSplit(split,seed=1234)
    sum = 0
    # split into 5 folder to do cross_validation
    for i in range(len(train)):
        test = train[i]
        testData = test.map(lambda x:(x[0],x[1])).cache()
        trainData = sc.union(train[0:i]+train[i+1:])
        train_model = ALS.trainImplicit(trainData, rank, iterations, lam, alpha)
        prediction = train_model.predictAll(testData).map(lambda r: ((r[0],r[1]),r[2]))  #prediction value for testing data
        # Evaluate the model on testing data
        ratesAndPreds = test.map(lambda r: ((r[0], r[1]), r[2])).join(prediction)
        MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
        sum += MSE
    print("The average Mean Squared Error for (%s %s %s %s) = %s" % (rank, iterations, lam, alpha, sum/10))
    return sum/10

#Playing with the hyper-parameters
ranks = [10, 50]
lams = [1.0, 0.0001]
alphas = [1, 40]
MIN = float('inf')
iterations = 10
for rank in ranks:
    for lam in lams:
        for alpha in alphas:
            MSE = cross_validation(train_data, rank, iterations, alpha, lam)
            if MSE<MIN:
                MIN = MSE
                params = (rank, iterations, lam, alpha)
print("The best model with params: " + str(params) +",and MSE is " + str(MIN))

The average Mean Squared Error for (10 10 1.0 1) = 18470813.814283602
The average Mean Squared Error for (10 10 1.0 40) = 18470808.96186333
The average Mean Squared Error for (10 10 0.0001 1) = 18470726.10576769
The average Mean Squared Error for (10 10 0.0001 40) = 18470727.610244505
The average Mean Squared Error for (50 10 1.0 1) = 18470840.306206394
The average Mean Squared Error for (50 10 1.0 40) = 18470841.475081377
The average Mean Squared Error for (50 10 0.0001 1) = 18470928.53915021
The average Mean Squared Error for (50 10 0.0001 40) = 18470929.60796326
The best model with params: (10, 10, 0.0001, 1),and MSE is 18470726.10576769
