In [25]:
from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
import os 


sc.stop()
conf=SparkConf().setMaster("local[*]").setAppName("movierecommand")
sc=SparkContext(conf=conf)
sc.setCheckpointDir("checkpoint")
dir_path = os.path.dirname(os.path.realpath('__file__'))
data_path=dir_path+"/ml-100k/"

data = sc.textFile(data_path+"u.data")
#userID, novie id, rate 
ratings = data.map(lambda l: l.split('\t'))\
    .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2]))).cache()
                     

# Training  and Fitting with ALS
Collective filtering is commonly used in the recommendation system. It recommends items with high similarity which can be computed from user/item letant factor. Matrix factorization model is used to measure the similarity. Let us walk through the math. The following matrix is the user and item rating matrix( $r$ ) which is loaded in the previous block, it is full of unknown value since the user hasn't watched item.

\begin{bmatrix}5 & 5 & ? & 5 \\5 & ? & ? & 4\\3 & ? & 1 & ?\\? & 5 & ? & 4 \end{bmatrix} 

We assume $r$ is the dot product result of a user vector($u$) and an item vector($p$).

\begin{bmatrix}p_0u_0 & p_1u_0 & p_2u_0 & p_3u_0 \\p_0u_1 & p_1u_1 & p_2u_1 & p_3u_1\\p_0u_2 & p_1u_2 & p_2u_2 & p_3u_2\\p_0u_3 & p_1u_3 & p_2u_3 & p_3u_3 \end{bmatrix} 

Those unknown value can be filled by matrix decomposition, meaning finding $p$ and $u$. It can be done by minimizing the error $\Sigma_{ij} (r_{ij} -p_iu_j)^2$ with some regulation. 
Alternating least squares(ALS) is a method to do the matrix decomposition by finding the minimal error through alternating fixed $p_i$ or $u_j$.


In [34]:
train, test = ratings.randomSplit([0.7,0.3],7856)
rank = 10
numIterations = 20
model = ALS.train(train, rank, numIterations)

testdata = test.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = test.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))

Mean Squared Error = 1.28703026974


# Recommand movie for user by the trained model  

In [29]:
#load movie name

moviedict={}
f = open(data_path+"u.item", "r") 
for line in f:
    col=line.split("|")
    i=int(col[0])
    name=col[1]
    moviedict[i]=name
    

In [35]:
userID=196
res=model.recommendProducts(userID,10)
for re in res:
    print moviedict[int(re[1])]+"score="+str(re[2])

War, The (1994)score=10.6412553652
Balto (1995)score=9.86760668309
Road to Wellville, The (1994)score=8.91258831862
Foreign Correspondent (1940)score=8.12708785494
Ponette (1996)score=7.5406304967
In the Mouth of Madness (1995)score=7.53830845181
Crash (1996)score=7.29768287693
Ninotchka (1939)score=7.24622395506
Orlando (1993)score=7.23525782614
Once Were Warriors (1994)score=7.19225420347


# Reference
1. https://www.youtube.com/watch?v=MDo2WanJ2FM
2. https://www.kdnuggets.com/2017/08/recommendation-system-algorithms-overview.html
3. https://spark.apache.org/docs/2.2.0/mllib-collaborative-filtering.html