In [1]:
import pyspark
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark.mllib.clustering import KMeans, KMeansModel
from settings import *

# Read data, Split to train test

In [2]:
# Sorting data by time
rdd =  sc.textFile(RATINGS_10M).map(lambda line: [float(x) for x in line.split('::')]).sortBy(lambda x: x[3],False)
size = rdd.count()

In [3]:
# Processing data to structure: Rating(user=62510, product=34148, rating=3.0)
train = rdd.zipWithIndex().filter(lambda x: x[-1] < size*0.6).map(lambda x: Rating(int(x[0][0]), int(x[0][1]), x[0][2]))
testdata = rdd.zipWithIndex().filter(lambda x: x[-1] > size*0.6).map(lambda x: Rating(int(x[0][0]), int(x[0][1]), x[0][2]))
print "read data finished"

read data finished


# Compute user bias

In [4]:
'''
test = sc.parallelize([(1,3), (3,4), (1,3), (1,4)])

print test.map(lambda data: (data[0], 1)).reduceByKey(lambda a,b: a+1).collectAsMap()
#Test counter
'''
user_count = train.map(lambda data: (data[0], 1)).reduceByKey(lambda a,b: a+1).collectAsMap()
score_mean = train.map(lambda data: data[2]).mean()
user_score_bias_sum = train.map(lambda data: (data[0], data[2] - score_mean)).reduceByKey(lambda a,b: a+b).collectAsMap()
user_bias = {}
for key in user_count.keys():
    user_bias[key] = user_score_bias_sum[key]/user_count[key]
print "user bias finished"

user bias finished


# Compute movie bias

In [5]:
def test(rating):
    print user_bias[rating.user]
    return (rating.product, rating.rating - score_mean)
movie_count = train.map(lambda rating: (rating.product, 1)).reduceByKey(lambda a,b: a+1).collectAsMap()
movie_score_bias_sum = train.map(lambda rating: (rating.product, rating.rating - score_mean  - user_bias[rating.user] )).reduceByKey(lambda a,b: a+b).collectAsMap()
#lambda rating: (rating.product, rating.rating - score_mean  - user_bias[rating.user] )

movie_bias = {}
for key in movie_count.keys():
    movie_bias[key] = movie_score_bias_sum[key]/movie_count[key]
print "product bias finished"

product bias finished


In [6]:
'''
test = sc.parallelize([(1,3), (3,4), (1,3), (1,4)])
def f(x): return (x[0], x[1]**2)
print test.map(f).collect()
'''

train_rm_user = train.map(lambda rating: Rating(rating.user, rating.product, rating.rating - user_bias[rating.user]))
train_rm_movie = train.map(lambda rating: Rating(rating.user, rating.product, rating.rating - user_bias[rating.user] - movie_bias[rating.product]))
print "remove bias finish"

remove bias finish


In [7]:
#print movie_count
#print movie_score_bias_sum
#print movie_bias
print train.first()
print train_rm_user.first()
print train_rm_movie.first()

Rating(user=62510, product=34148, rating=3.0)
Rating(user=62510, product=34148, rating=2.5208408027796696)
Rating(user=62510, product=34148, rating=2.0424981526183856)


# KMeans

In [8]:
user_train = rdd.zipWithIndex().filter(lambda x: x[-1] < size*0.6).map(lambda x: (x[0][0], float(x[0][2])))
product_train = rdd.zipWithIndex().filter(lambda x: x[-1] < size*0.6).map(lambda x: (x[0][1], float(x[0][2])))
print "train user"
user_clusters = KMeans.train(user_train, int(len(user_bias)/2), maxIterations=20, initializationMode="random")
print "train cluster"
product_clusters = KMeans.train(product_train, int(len(movie_bias)/2), maxIterations=20, initializationMode="random")

print "train finished"

train user
train cluster
train finished


In [None]:
def convert(data):
    user = predict((data[0], float(data[2])), user_clusters)
    product = predict((data[1], float(user[1])), product_clusters)
    return "::".join( (str(user[0]), str(product[0]), str(product[1])) )
def predict(point, clusters):
    category = clusters.predict(point)
    center = clusters.centers[category]
    #print category
    return (category, center[1])
train_reduce = rdd.zipWithIndex().filter(lambda x: x[-1] < size*0.6)\
        .map(lambda x: (x[0][0], x[0][1], x[0][2])).map(convert).map(lambda x: Rating(int(x[0]), int(x[1]), x[2]))
validate_reduce = rdd.zipWithIndex().filter(lambda x: size*0.6<=x[-1] < size*0.8)\
        .map(lambda x: (x[0][0], x[0][1], x[0][2])).map(convert).map(lambda x: Rating(int(x[0]), int(x[1]), x[2]))
test_reduce = rdd.zipWithIndex().filter(lambda x: size*0.6<=x[-1] < size*0.8)\
        .map(lambda x: (x[0][0], x[0][1], x[0][2])).map(convert).map(lambda x: Rating(int(x[0]), int(x[1]), x[2]))

In [13]:
print "save model"
user_clusters.save(sc, "user_Model")
product_clusters.save(sc, "product_Model")

save model


In [None]:
print "save data"
print "save train"
train_reduce.cache()
train_reduce.saveAsTextFile("validate.dat")
'''
train_reduce_list = train_reduce.collect()
with open(DATA_FOLDER + "train.dat", "wb") as f:
    for rating in train_reduce_list:
        f.write(str(rating.user) + "::" + str(rating.product) + "::" + str(rating.rating) + "\n")
'''
print "save train finished"      

save data
save train


In [None]:
print "save validate"
validate_reduce.cache()
validate_reduce_list = validate_reduce.collect()
with open(DATA_FOLDER + "validate.dat", "wb") as f:
    for rating in validate_reduce:
        f.write(str(rating.user) + "::" + str(rating.product) + "::" + str(rating.rating) + "\n")
print "save validate finished"        

In [None]:
print "save test"
test_reduce.cache()
test_reduce = test_reduce.collect()
with open(DATA_FOLDER + "test.dat", "wb") as f:
    for rating in test_reduce:
        f.write(str(rating.user) + "::" + str(rating.product) + "::" + str(rating.rating) + "\n")
print "save test finished"

In [None]:
print train_reduce.first()

In [None]:
# Training ALS Model
rank = 10
numIterations = 10
model = ALS.train(train, rank, numIterations)

# Model Evaluation

In [41]:
# Trying model evaluation



In [42]:
predictions = model.predictAll(test).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = testdata.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)

((54040, 912), 4.367416929660967)
((31630, 1412), (4.0, 4.163697460854653))


In [43]:
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))

Mean Squared Error = 1.00137982702


# Recommendation System

In [None]:
# Trying recommendation System 

#testdata = train.map(lambda p: (p[0], p[1]))
testUsers = train.map(lambda p: p[0])

In [None]:
model.recommendProducts(testUsers, 100)

#predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
#ratesAndPreds = train.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)


In [None]:
#MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
#print("Mean Squared Error = " + str(MSE))