In [1]:
from pyspark.mllib.recommendation import *
import random
from operator import *

In [2]:
userItemFile = sc.textFile("tianchi_fresh_comp_train_user.txt")

In [3]:
def parser1(s):
    temp = s.split("\t")
    return (int(temp[0]),int(temp[1]),int(temp[2]))

In [4]:
userItem = userItemFile.map(parser1).map(lambda x: ((x[0],x[1]),x[2])).reduceByKey(lambda x,y: x+y).map(lambda x: (x[0][0],x[0][1],x[1]))

In [5]:
trainData,validationData,testData = userItem.randomSplit([6,2,2], 13)
# print(trainData.take(3))
# print(validationData.take(3))
# print(testData.take(3))
print(trainData.count())
print(validationData.count())
print(testData.count())
trainData.cache()
validationData.cache()
testData.cache()

238376
79463
79437


PythonRDD[11] at RDD at PythonRDD.scala:48

In [6]:
import math

In [7]:
def train_ALS(train_data, validation_data, num_iters, reg_param, ranks):
    """
    Grid Search Function to select the best model based on RMSE of hold-out data
    """
    # initial
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    for rank in ranks:
        for reg in reg_param:
            # train ALS model
            model = ALS.train(
                ratings=train_data,    # (userID, productID, rating) tuple
                iterations=num_iters,
                rank=rank,
                lambda_=reg,           # regularization param
                seed=99)
            # make prediction
            valid_data = validation_data.map(lambda p: (p[0], p[1]))
            predictions = model.predictAll(valid_data).map(lambda r: ((r[0], r[1]), r[2]))
            # get the rating result
            ratesAndPreds = validation_data.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
            # get the RMSE
            MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
            error = math.sqrt(MSE)
            print('{} latent factors: validation RMSE is {}'.format(rank, error))
            if error < min_error:
                min_error = error
                best_rank = rank
                best_regularization = reg
                best_model = model
    print('\nThe best model has {} latent factors'.format(best_rank))
    return best_model

In [10]:
# hyper-param config
num_iterations = 5
ranks = [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
reg_params = [0.01]

In [11]:
final_model = train_ALS(trainData, validationData, num_iterations, reg_params, ranks)

2 latent factors: validation RMSE is 6.80573981701
4 latent factors: validation RMSE is 6.00615239206
6 latent factors: validation RMSE is 5.64247588863
8 latent factors: validation RMSE is 5.38437091669
10 latent factors: validation RMSE is 5.27461613662
12 latent factors: validation RMSE is 5.20068317767
14 latent factors: validation RMSE is 5.1696832275
16 latent factors: validation RMSE is 5.08510276692
18 latent factors: validation RMSE is 5.06791137454
20 latent factors: validation RMSE is 5.06389790431

The best model has 20 latent factors


In [12]:
temp = final_model.recommendProducts(104448961, 5)
lists = []
for i in range(0,len(temp)):
    lists.append(temp[i][1])
    print("Item "+str(i+1)+": "+str(temp[i][1]))

Item 1: 299050468
Item 2: 287845290
Item 3: 258745390
Item 4: 148597624
Item 5: 94776053


In [15]:
# make prediction using test data
test_data = testData.map(lambda p: (p[0], p[1]))
predictions = final_model.predictAll(test_data).map(lambda r: ((r[0], r[1]), r[2]))
# get the rating result
ratesAndPreds = testData.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
# get the RMSE
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
error = math.sqrt(MSE)
print('The out-of-sample RMSE of rating predictions is', round(error, 4))

('The out-of-sample RMSE of rating predictions is', 5.2407)
